How-to — TCP Servers

Note

This page uses two different API variants:

  • Synchronous API with classic def functions, usable in any context.

  • Asynchronous API with async def functions, using an asynchronous framework to perform I/O operations.

All asynchronous API examples assume that you are using asyncio, but you can use a different library thanks to the asynchronous backend engine API.


Introduction

The easynetwork.api_async.server module simplifies the task of writing network servers. The service creation model is inspired by the standard socketserver library, but is an enhanced version with even more abstraction.

Creating a server requires several steps:

  1. Derive a class from AsyncStreamRequestHandler and redefine its handle() method; this method will process incoming requests.

  2. Instantiate the AsyncTCPNetworkServer class passing it the server’s address, the protocol object and the request handler instance.

  3. Call serve_forever() to process requests.

Writing coroutine functions is mandatory to use this server.

See also

PEP 492 — Coroutines with async and await syntax

The proposal to introduce native coroutines in Python with async and await syntax.

asyncio — Asynchronous I/O

If you are not familiar with async/await syntax, you can use the standard library to get started with coroutines.

Request Handler Objects

Note

Unlike socketserver.BaseRequestHandler, there is only one AsyncStreamRequestHandler instance for the entire service.

Here is a simple example:

 1from __future__ import annotations
 2
 3from collections.abc import AsyncGenerator
 4
 5from easynetwork.api_async.server import AsyncStreamClient, AsyncStreamRequestHandler
 6
 7
 8class Request:
 9    """Object representing the client request."""
10
11    ...
12
13
14class Response:
15    """Object representing the response to send to the client."""
16
17    ...
18
19
20class MyRequestHandler(AsyncStreamRequestHandler[Request, Response]):
21    """
22    The request handler class for our server.
23
24    It is instantiated once to the server, and must
25    override the handle() method to implement communication to the
26    client.
27    """
28
29    async def handle(
30        self,
31        client: AsyncStreamClient[Response],
32    ) -> AsyncGenerator[None, Request]:
33        # "client" is the write stream of the connection to the remote host.
34        # The read stream is covered by the server and the incoming
35        # request is sent through the "yield" statement.
36        request: Request = yield
37
38        # Do some stuff
39        ...
40
41        response = Response()
42
43        await client.send_packet(response)

Using handle() Generator

See also

PEP 525 — Asynchronous Generators

The proposal that expanded on PEP 492 by adding generator capabilities to coroutine functions.

Minimum Requirements

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    ### Before 'yield'
 6    # Initializes the generator.
 7    # This is the setup part before receiving a request.
 8    # You don't have much thing to do here.
 9    ##################
10
11    request: Request = yield
12
13    ### After 'yield'
14    # Once the server has sent you the client's request,
15    # you can do whatever you want with it and send responses back
16    # to the client if necessary.
17    await client.send_packet(Response())
18    #################
19
20    ### On a 'return'
21    # When handle() returns, it means that this request handler is finished.
22    # It does not close the connection or anything.
23    # The server immediately creates a new generator.
24    #################
25    return

Closing the connection

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    await client.send_packet(Response())
 8
 9    # At this point, the transport is closed and the server
10    # will not create a new generator.
11    await client.aclose()

Tip

You can use contextlib.aclosing() to close the client at the generator exit.

1async def handle(
2    self,
3    client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5    async with contextlib.aclosing(client):
6        request: Request = yield
7
8        await client.send_packet(Response())

Important

The connection is forcibly closed under the following conditions:

  • handle() raises an exception.

  • handle() returns before the first yield statement.

    1async def handle(
    2    self,
    3    client: AsyncStreamClient[Response],
    4) -> AsyncGenerator[None, Request]:
    5    if not self.should_handle(client):
    6        return
    7
    8    request: Request = yield
    

Error Handling

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    try:
 6        # *All* exceptions are thrown through the "yield" statement
 7        # (including BaseException). But you should only catch Exception subclasses.
 8        request: Request = yield
 9    except StreamProtocolParseError:
10        await client.send_packet(BadRequest())
11    except OSError:
12        # It is possible that something went wrong with the underlying
13        # transport (the socket) at the OS level.
14        # You should check if the client is always usable.
15        try:
16            await client.send_packet(InternalError())
17        except OSError:
18            await client.aclose()
19            raise
20    except Exception:
21        await client.send_packet(InternalError())
22    else:
23        await client.send_packet(Response())

Note

handle() will never get a ConnectionError subclass. In case of an unexpected disconnect, the generator is closed, so you should handle GeneratorExit instead.

Having Multiple yield Statements

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    ...
 8
 9    await client.send_packet(Response())
10
11    if self.need_something_else(request, client):
12        additional_data: Request = yield
13
14        ...
15
16        await client.send_packet(Response())

Tip

The number of yield allowed is… infinite!

You can take advantage of this by having an internal main loop inside the generator:

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    # Close the client at the loop break
 6    async with contextlib.aclosing(client):
 7        # Ask the user to log in
 8        initial_user_info: Request = yield
 9
10        ...
11
12        # Sucessfully logged in
13        await client.send_packet(Response())
14
15        # Start handling requests
16        while not client.is_closing():
17            request: Request = yield
18
19            ...
20
21            await client.send_packet(Response())

Cancellation And Timeouts

Since all BaseException subclasses are thrown into the generator, you can apply a timeout to the read stream using the asynchronous framework (the cancellation exception is retrieved in the generator):

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    try:
 6        async with asyncio.timeout(30):
 7            # The client has 30 seconds to send the request to the server.
 8            request: Request = yield
 9    except TimeoutError:
10        await client.send_packet(TimedOut())
11    else:
12        await client.send_packet(Response())

Connecting/Disconnecting Hooks

You can override on_connection() and on_disconnection() methods:

 1async def on_connection(self, client: AsyncStreamClient[Response]) -> None:
 2    print(f"{client!r} is connected")
 3
 4    # Notify the client that the service is ready.
 5    await client.send_packet(Response())
 6
 7async def on_disconnection(self, client: AsyncStreamClient[Response]) -> None:
 8    # Perfom service shutdown clean-up
 9    ...
10
11    print(f"{client!r} is disconnected")
12
13async def handle(
14    self,
15    client: AsyncStreamClient[Response],
16) -> AsyncGenerator[None, Request]:
17    request: Request = yield
18
19    ...
20
21    await client.send_packet(Response())

Wait For Client Data On Connection

If you need to use the read stream, on_connection() can be an asynchronous generator instead of a coroutine function:

 1async def on_connection(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    # Ask the user to log in
 6    initial_user_info: Request = yield
 7
 8    ...
 9
10    # Sucessfully logged in
11    await client.send_packet(Response())
12
13async def on_disconnection(self, client: AsyncStreamClient[Response]) -> None:
14    # Perfom log out clean-up
15    ...
16
17async def handle(
18    self,
19    client: AsyncStreamClient[Response],
20) -> AsyncGenerator[None, Request]:
21    request: Request = yield
22
23    ...
24
25    await client.send_packet(Response())

Service Initialization

The server will call service_init() and pass it an AsyncExitStack at the beginning of the serve_forever() task to set up the global service.

This allows you to do something like this:

 1async def service_init(
 2    self,
 3    exit_stack: contextlib.AsyncExitStack,
 4    server: AsyncTCPNetworkServer[Request, Response],
 5) -> None:
 6    exit_stack.callback(self._service_quit)
 7
 8    self.background_tasks = await exit_stack.enter_async_context(asyncio.TaskGroup())
 9
10    _ = self.background_tasks.create_task(self._service_actions())
11
12async def _service_actions(self) -> None:
13    while True:
14        await asyncio.sleep(1)
15
16        # Do some stuff each second in background
17        ...
18
19def _service_quit(self) -> None:
20    print("Service stopped")

Server Object

A basic example of how to run the server:

 1from __future__ import annotations
 2
 3import asyncio
 4from collections.abc import AsyncGenerator
 5
 6from easynetwork.api_async.server import AsyncStreamClient, AsyncStreamRequestHandler, AsyncTCPNetworkServer
 7from easynetwork.protocol import StreamProtocol
 8
 9
10class Request:
11    ...
12
13
14class Response:
15    ...
16
17
18class MyRequestHandler(AsyncStreamRequestHandler[Request, Response]):
19    async def handle(
20        self,
21        client: AsyncStreamClient[Response],
22    ) -> AsyncGenerator[None, Request]:
23        request: Request = yield
24
25        ...
26
27        await client.send_packet(Response())
28
29
30# NOTE: The sent packet is "Response" and the received packet is "Request"
31class ServerProtocol(StreamProtocol[Response, Request]):
32    def __init__(self) -> None:
33        ...
34
35
36async def main() -> None:
37    host, port = "localhost", 9000
38    protocol = ServerProtocol()
39    handler = MyRequestHandler()
40
41    # Create the server, binding to localhost on port 9000
42    async with AsyncTCPNetworkServer(host, port, protocol, handler) as server:
43        # Activate the server; this will keep running until you
44        # interrupt the program with Ctrl-C
45        await server.serve_forever()
46
47
48if __name__ == "__main__":
49    asyncio.run(main())

See also

An Echo Client/Server Over TCP

A working example of the server implementation.