How-to — TCP Servers
Note
This page uses two different API variants:
Synchronous API with classic
def
functions, usable in any context.Asynchronous API with
async def
functions, using an asynchronous framework to perform I/O operations.
All asynchronous API examples assume that you are using asyncio
,
but you can use a different library thanks to the asynchronous backend engine API.
Introduction
The easynetwork.servers
module simplifies the task of writing network servers. The service creation model is inspired by
the standard socketserver
library, but is an enhanced version with even more abstraction.
Creating a server requires several steps:
Derive a class from
AsyncStreamRequestHandler
and redefine itshandle()
method; this method will process incoming requests.Instantiate the
AsyncTCPNetworkServer
class passing it the server’s address, the protocol object and the request handler instance.Call
serve_forever()
to process requests.
Writing coroutine functions is mandatory to use this server.
See also
- PEP 492 — Coroutines with async and await syntax
The proposal to introduce native coroutines in Python with
async
andawait
syntax.- asyncio — Asynchronous I/O
If you are not familiar with async/await syntax, you can use the standard library to get started with coroutines.
Request Handler Objects
Note
Unlike socketserver.BaseRequestHandler
, there is only one AsyncStreamRequestHandler
instance for the entire service.
Here is a simple example:
1from __future__ import annotations
2
3from collections.abc import AsyncGenerator
4
5from easynetwork.servers.handlers import AsyncStreamClient, AsyncStreamRequestHandler
6
7
8class Request:
9 """Object representing the client request."""
10
11 ...
12
13
14class Response:
15 """Object representing the response to send to the client."""
16
17 ...
18
19
20class MyRequestHandler(AsyncStreamRequestHandler[Request, Response]):
21 """
22 The request handler class for our server.
23
24 It is instantiated once to the server, and must
25 override the handle() method to implement communication to the
26 client.
27 """
28
29 async def handle(
30 self,
31 client: AsyncStreamClient[Response],
32 ) -> AsyncGenerator[None, Request]:
33 # "client" is the write stream of the connection to the remote host.
34 # The read stream is covered by the server and the incoming
35 # request is sent through the "yield" statement.
36 request: Request = yield
37
38 # Do some stuff
39 ...
40
41 response = Response()
42
43 await client.send_packet(response)
Using handle()
Generator
See also
Minimum Requirements
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 ### Before 'yield'
6 # Initializes the generator.
7 # This is the setup part before receiving a request.
8 # You don't have much thing to do here.
9 ##################
10
11 request: Request = yield
12
13 ### After 'yield'
14 # Once the server has sent you the client's request,
15 # you can do whatever you want with it and send responses back
16 # to the client if necessary.
17 await client.send_packet(Response())
18 #################
19
20 ### On a 'return'
21 # When handle() returns, it means that this request handler is finished.
22 # It does not close the connection or anything.
23 # The server immediately creates a new generator.
24 #################
25 return
Closing the connection
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 await client.send_packet(Response())
8
9 # At this point, the transport is closed and the server
10 # will not create a new generator.
11 await client.aclose()
Tip
You can use contextlib.aclosing()
to close the client at the generator exit.
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 async with contextlib.aclosing(client):
6 request: Request = yield
7
8 await client.send_packet(Response())
Important
The connection is forcibly closed under the following conditions:
handle()
raises an exception.handle()
returns before the firstyield
statement.1async def handle( 2 self, 3 client: AsyncStreamClient[Response], 4) -> AsyncGenerator[None, Request]: 5 if not self.should_handle(client): 6 return 7 8 request: Request = yield
Error Handling
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 try:
6 # *All* exceptions are thrown through the "yield" statement
7 # (including BaseException). But you should only catch Exception subclasses.
8 request: Request = yield
9 except StreamProtocolParseError:
10 await client.send_packet(BadRequest())
11 except OSError:
12 # It is possible that something went wrong with the underlying
13 # transport (the socket) at the OS level.
14 # You should check if the client is always usable.
15 try:
16 await client.send_packet(InternalError())
17 except OSError:
18 await client.aclose()
19 raise
20 except Exception:
21 # Most likely a bug in EasyNetwork code. Log the error.
22 traceback.print_exc()
23
24 await client.send_packet(InternalError())
25 else:
26 await client.send_packet(Response())
Note
handle()
will never get a ConnectionError
subclass. In case of an unexpected disconnect, the generator is closed,
so you should handle GeneratorExit
instead.
Warning
You should always log or re-raise a bare Exception
thrown in your generator.
1except Exception:
2 # Most likely a bug in EasyNetwork code. Log the error.
3 traceback.print_exc()
4
5 await client.send_packet(InternalError())
Having Multiple yield
Statements
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 ...
8
9 await client.send_packet(Response())
10
11 if self.need_something_else(request, client):
12 additional_data: Request = yield
13
14 ...
15
16 await client.send_packet(Response())
Tip
The number of yield
allowed is… infinite!
You can take advantage of this by having an internal main loop inside the generator:
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 # Close the client at the loop break
6 async with contextlib.aclosing(client):
7 # Ask the user to log in
8 initial_user_info: Request = yield
9
10 ...
11
12 # Sucessfully logged in
13 await client.send_packet(Response())
14
15 # Start handling requests
16 while not client.is_closing():
17 request: Request = yield
18
19 ...
20
21 await client.send_packet(Response())
Cancellation And Timeouts
It is possible to send the timeout delay to the parent task:
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[float | None, Request]:
5 try:
6 # The client has 30 seconds to send the request to the server.
7 request: Request = yield 30.0
8 except TimeoutError:
9 await client.send_packet(TimedOut())
10 else:
11 await client.send_packet(Response())
Since all BaseException
subclasses are thrown into the generator, you can apply a timeout to the read stream
using the asynchronous framework (the cancellation exception is retrieved in the generator):
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 try:
6 async with asyncio.timeout(30):
7 # The client has 30 seconds to send the request to the server.
8 request: Request = yield
9 except TimeoutError:
10 await client.send_packet(TimedOut())
11 else:
12 await client.send_packet(Response())
Warning
Note that this behavior works because the generator is always executed and closed in the same asynchronous task for the current implementation.
This feature is available so that features like anyio.CancelScope
can be used.
However, it may be removed in a future release.
Connecting/Disconnecting Hooks
You can override on_connection()
and on_disconnection()
methods:
on_connection()
is called on client task startup.on_disconnection()
is called on client task teardown.
1async def on_connection(self, client: AsyncStreamClient[Response]) -> None:
2 print(f"{client!r} is connected")
3
4 # Notify the client that the service is ready.
5 await client.send_packet(Response())
6
7async def on_disconnection(self, client: AsyncStreamClient[Response]) -> None:
8 # Perfom service shutdown clean-up
9 ...
10
11 print(f"{client!r} is disconnected")
12
13async def handle(
14 self,
15 client: AsyncStreamClient[Response],
16) -> AsyncGenerator[None, Request]:
17 request: Request = yield
18
19 ...
20
21 await client.send_packet(Response())
Wait For Client Data On Connection
If you need to use the read stream, on_connection()
can be an asynchronous generator instead of
a coroutine function:
1async def on_connection(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 # Ask the user to log in
6 initial_user_info: Request = yield
7
8 ...
9
10 # Sucessfully logged in
11 await client.send_packet(Response())
12
13async def on_disconnection(self, client: AsyncStreamClient[Response]) -> None:
14 # Perfom log out clean-up
15 ...
16
17async def handle(
18 self,
19 client: AsyncStreamClient[Response],
20) -> AsyncGenerator[None, Request]:
21 request: Request = yield
22
23 ...
24
25 await client.send_packet(Response())
Service Initialization
The server will call service_init()
and pass it an AsyncExitStack
at the beginning of the serve_forever()
task to set up the global service.
This allows you to do something like this:
1async def service_init(
2 self,
3 exit_stack: contextlib.AsyncExitStack,
4 server: AsyncTCPNetworkServer[Request, Response],
5) -> None:
6 exit_stack.callback(self._service_quit)
7
8 self.background_tasks = await exit_stack.enter_async_context(asyncio.TaskGroup())
9
10 _ = self.background_tasks.create_task(self._service_actions())
11
12async def _service_actions(self) -> None:
13 while True:
14 await asyncio.sleep(1)
15
16 # Do some stuff each second in background
17 ...
18
19def _service_quit(self) -> None:
20 print("Service stopped")
Per-client variables (contextvars
integration)
If your asynchronous framework supports per-task context variables, you can use this feature in your request handler:
1class ClientContextRequestHandler(AsyncStreamRequestHandler[Request, Response]):
2 client_addr_var: ClassVar[contextvars.ContextVar[SocketAddress]]
3 client_addr_var = contextvars.ContextVar("client_addr")
4
5 @classmethod
6 def client_log(cls, message: str) -> None:
7 # The address of the currently handled client can be accessed
8 # without passing it explicitly to this function.
9
10 logger = logging.getLogger(cls.__name__)
11
12 client_address = cls.client_addr_var.get()
13
14 logger.info("From %s: %s", client_address, message)
15
16 async def on_connection(
17 self,
18 client: AsyncStreamClient[Response],
19 ) -> None:
20 address = client.extra(INETClientAttribute.remote_address)
21 self.client_addr_var.set(address)
22
23 # In any code that we call within "handle()" is now possible to get
24 # client's address by calling 'client_addr_var.get()'.
25
26 async def handle(
27 self,
28 client: AsyncStreamClient[Response],
29 ) -> AsyncGenerator[None, Request]:
30 request: Request = yield
31
32 self.client_log(f"Received request: {request!r}")
33
34 await client.send_packet(Response())
Tip
It is possible to initialize the context to be copied in service_init()
.
This means that the contextvars.ContextVar.set()
calls made in service_init()
will be applied
to subsequent client tasks.
Warning
There is no context isolation between handle()
calls.
This means that the contextvars.ContextVar.set()
calls made in handle()
are applied to whole client task.
Server Object
A basic example of how to run the server:
1from __future__ import annotations
2
3import asyncio
4from collections.abc import AsyncGenerator
5
6from easynetwork.protocol import StreamProtocol
7from easynetwork.servers import AsyncTCPNetworkServer
8from easynetwork.servers.handlers import AsyncStreamClient, AsyncStreamRequestHandler
9
10
11class Request: ...
12
13
14class Response: ...
15
16
17class MyRequestHandler(AsyncStreamRequestHandler[Request, Response]):
18 async def handle(
19 self,
20 client: AsyncStreamClient[Response],
21 ) -> AsyncGenerator[None, Request]:
22 request: Request = yield
23
24 ...
25
26 await client.send_packet(Response())
27
28
29# NOTE: The sent packet is "Response" and the received packet is "Request"
30class ServerProtocol(StreamProtocol[Response, Request]):
31 def __init__(self) -> None: ...
32
33
34async def main() -> None:
35 host, port = "localhost", 9000
36 protocol = ServerProtocol()
37 handler = MyRequestHandler()
38
39 # Create the server, binding to localhost on port 9000
40 async with AsyncTCPNetworkServer(host, port, protocol, handler) as server:
41 # Activate the server; this will keep running until you
42 # interrupt the program with Ctrl-C
43 await server.serve_forever()
44
45
46if __name__ == "__main__":
47 asyncio.run(main())
See also
- An Echo Client/Server Over TCP
A working example of the server implementation.
Run Server In Background
1from __future__ import annotations
2
3import asyncio
4from collections.abc import AsyncGenerator
5from typing import Any
6
7from easynetwork.clients import AsyncTCPNetworkClient
8from easynetwork.protocol import StreamProtocol
9from easynetwork.serializers import JSONSerializer
10from easynetwork.servers import AsyncTCPNetworkServer
11from easynetwork.servers.handlers import AsyncStreamClient, AsyncStreamRequestHandler
12
13
14class JSONProtocol(StreamProtocol[dict[str, Any], dict[str, Any]]):
15 def __init__(self) -> None:
16 super().__init__(JSONSerializer())
17
18
19class MyRequestHandler(AsyncStreamRequestHandler[dict[str, Any], dict[str, Any]]):
20 async def handle(
21 self,
22 client: AsyncStreamClient[dict[str, Any]],
23 ) -> AsyncGenerator[None, dict[str, Any]]:
24 request: dict[str, Any] = yield
25
26 current_task = asyncio.current_task()
27 assert current_task is not None
28
29 await client.send_packet({"task": current_task.get_name(), "request": request})
30
31
32async def client(host: str, port: int, message: str) -> None:
33 async with AsyncTCPNetworkClient((host, port), JSONProtocol()) as client:
34 await client.send_packet({"message": message})
35 response = await client.recv_packet()
36 print(f"From server: {response}")
37
38
39async def main() -> None:
40 host, port = "localhost", 9000
41 protocol = JSONProtocol()
42 handler = MyRequestHandler()
43
44 server = AsyncTCPNetworkServer(host, port, protocol, handler)
45
46 async with server:
47 is_up_event = asyncio.Event()
48 server_task = asyncio.create_task(server.serve_forever(is_up_event=is_up_event))
49 await is_up_event.wait()
50
51 print(f"Server loop running in task: {server_task.get_name()}")
52
53 await client(host, port, "Hello world 1")
54 await client(host, port, "Hello world 2")
55 await client(host, port, "Hello world 3")
56
57 await server.shutdown()
58
59
60if __name__ == "__main__":
61 asyncio.run(main())
The output of the example should look something like this:
$ python background_server.py
Server loop running in task: Task-2
From server: {'task': 'Task-8', 'request': {'message': 'Hello world 1'}}
From server: {'task': 'Task-11', 'request': {'message': 'Hello world 2'}}
From server: {'task': 'Task-14', 'request': {'message': 'Hello world 3'}}
SSL/TLS Connection
If you want your client to make an SSL connection, you need to pass an SSLContext
with the required configuration:
1from __future__ import annotations
2
3import asyncio
4import ssl
5from collections.abc import AsyncGenerator
6from typing import Any
7
8from easynetwork.clients import AsyncTCPNetworkClient
9from easynetwork.protocol import StreamProtocol
10from easynetwork.serializers import JSONSerializer
11from easynetwork.servers import AsyncTCPNetworkServer
12from easynetwork.servers.handlers import AsyncStreamClient, AsyncStreamRequestHandler
13
14
15class JSONProtocol(StreamProtocol[dict[str, Any], dict[str, Any]]):
16 def __init__(self) -> None:
17 super().__init__(JSONSerializer())
18
19
20class MyRequestHandler(AsyncStreamRequestHandler[dict[str, Any], dict[str, Any]]):
21 async def handle(
22 self,
23 client: AsyncStreamClient[dict[str, Any]],
24 ) -> AsyncGenerator[None, dict[str, Any]]:
25 request: dict[str, Any] = yield
26
27 current_task = asyncio.current_task()
28 assert current_task is not None
29
30 await client.send_packet({"task": current_task.get_name(), "request": request})
31
32
33async def client(host: str, port: int, message: str) -> None:
34 async with AsyncTCPNetworkClient((host, port), JSONProtocol(), ssl=True) as client:
35 await client.send_packet({"message": message})
36 response = await client.recv_packet()
37 print(f"From server: {response}")
38
39
40async def main() -> None:
41 host, port = "localhost", 9000
42 protocol = JSONProtocol()
43 handler = MyRequestHandler()
44
45 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
46 ssl_context.load_cert_chain(
47 "/path/to/ssl_cert.pem",
48 "/path/to/ssl_key.pem",
49 )
50 server = AsyncTCPNetworkServer(host, port, protocol, handler, ssl=ssl_context)
51
52 async with server:
53 is_up_event = asyncio.Event()
54 server_task = asyncio.create_task(server.serve_forever(is_up_event=is_up_event))
55 await is_up_event.wait()
56
57 print(f"Server loop running in task: {server_task.get_name()}")
58
59 await client(host, port, "Hello world 1")
60 await client(host, port, "Hello world 2")
61 await client(host, port, "Hello world 3")
62
63 await server.shutdown()
64
65
66if __name__ == "__main__":
67 asyncio.run(main())