How-to — UDP Servers
Note
This page uses two different API variants:
Synchronous API with classic
def
functions, usable in any context.Asynchronous API with
async def
functions, using an asynchronous framework to perform I/O operations.
All asynchronous API examples assume that you are using asyncio
,
but you can use a different library thanks to the asynchronous backend engine API.
Introduction
Creating a UDP server requires several steps:
Derive a class from
AsyncDatagramRequestHandler
and redefine itshandle()
method; this method will process incoming requests.Instantiate the
AsyncUDPNetworkServer
class passing it the server’s address, the protocol object and the request handler instance.Call
serve_forever()
to process requests.
Request Handler Objects
Note
Unlike socketserver.BaseRequestHandler
, there is only one AsyncDatagramRequestHandler
instance for the entire service.
Here is a simple example:
1from __future__ import annotations
2
3from collections.abc import AsyncGenerator
4
5from easynetwork.servers.handlers import AsyncDatagramClient, AsyncDatagramRequestHandler
6
7
8class Request:
9 """Object representing the client request."""
10
11 ...
12
13
14class Response:
15 """Object representing the response to send to the client."""
16
17 ...
18
19
20class MyRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
21 """
22 The request handler class for our server.
23
24 It is instantiated once to the server, and must
25 override the handle() method to implement communication to the
26 client.
27 """
28
29 async def handle(
30 self,
31 client: AsyncDatagramClient[Response],
32 ) -> AsyncGenerator[None, Request]:
33 # "client" a placeholder to have a stream-like API.
34 # All the datagrams sent by this client are sent
35 # through the "yield" statement.
36 request: Request = yield
37
38 # Do some stuff
39 ...
40
41 response = Response()
42
43 # The corresponding call is server_socket.sendto(data, remote_address)
44 await client.send_packet(response)
Using handle()
Generator
Important
There will always be only one active generator per client. All the pending datagrams received while the generator is running are queued.
This behavior is designed to act like a stream request handler.
Minimum Requirements
1async def handle(
2 self,
3 client: AsyncDatagramClient[Response],
4) -> AsyncGenerator[None, Request]:
5 ### Before 'yield'
6 # Initializes the generator.
7 # This is the setup part before receiving a request.
8 # Unlike the stream request handler, the generator is started
9 # when the datagram is received (but is not parsed yet).
10 ##################
11
12 request: Request = yield
13
14 ### After 'yield'
15 # The received datagram is parsed.
16 # you can do whatever you want with it and send responses back
17 # to the client if necessary.
18 await client.send_packet(Response())
19 #################
20
21 ### On a 'return'
22 # When handle() returns, it means that this request handler is finished.
23 # The server creates a new generator when a new datagram is received.
24 #################
25 return
Refuse datagrams
Your UDP socket can receive datagrams from anywhere. You may want to control who can send you information.
1async def handle(
2 self,
3 client: AsyncDatagramClient[Response],
4) -> AsyncGenerator[None, Request]:
5 if not self.should_handle(client):
6 # By returning before the "yield" statement, you ask the server to discard
7 # the received datagram.
8 return
9
10 request: Request = yield
Error Handling
1async def handle(
2 self,
3 client: AsyncDatagramClient[Response],
4) -> AsyncGenerator[None, Request]:
5 try:
6 # *All* exceptions are thrown through the "yield" statement
7 # (including BaseException). But you should only catch Exception subclasses.
8 request: Request = yield
9 except DatagramProtocolParseError:
10 await client.send_packet(BadRequest())
11 except Exception:
12 # Most likely a bug in EasyNetwork code. Log the error.
13 traceback.print_exc()
14
15 await client.send_packet(InternalError())
16 else:
17 await client.send_packet(Response())
Warning
You should always log or re-raise a bare Exception
thrown in your generator.
1except Exception:
2 # Most likely a bug in EasyNetwork code. Log the error.
3 traceback.print_exc()
4
5 await client.send_packet(InternalError())
Having Multiple yield
Statements
1async def handle(
2 self,
3 client: AsyncDatagramClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 ...
8
9 await client.send_packet(Response())
10
11 if self.need_something_else(request, client):
12 additional_data: Request = yield
13
14 ...
15
16 await client.send_packet(Response())
Warning
Even if this feature is supported, it is not recommended to have more than one (unless you know what you are doing) for the following reasons:
UDP does not guarantee ordered delivery. Packets are typically “sent” in order, but they may be received out of order. In large networks, it is reasonably common for some packets to arrive out of sequence (or not at all).
The server has no way of knowing if this client has stopped sending you requests forever.
If you plan to use multiple yields in your request handler, you should always have a timeout applied. (See the section below.)
Cancellation And Timeouts
It is possible to send the timeout delay to the parent task:
1async def handle(
2 self,
3 client: AsyncDatagramClient[Response],
4) -> AsyncGenerator[float | None, Request]:
5 # It is *never* useful to have a timeout for the 1st datagram
6 # because the datagram is already in the queue.
7 request: Request = yield None
8
9 ...
10
11 await client.send_packet(Response())
12
13 try:
14 # The client has 30 seconds to send the 2nd request to the server.
15 another_request: Request = yield 30
16 except TimeoutError:
17 await client.send_packet(TimedOut())
18 else:
19 await client.send_packet(Response())
Since all BaseException
subclasses are thrown into the generator, you can apply a timeout to the read stream
using the asynchronous framework (the cancellation exception is retrieved in the generator):
1async def handle(
2 self,
3 client: AsyncDatagramClient[Response],
4) -> AsyncGenerator[None, Request]:
5 # It is *never* useful to have a timeout for the 1st datagram
6 # because the datagram is already in the queue.
7 request: Request = yield
8
9 ...
10
11 await client.send_packet(Response())
12
13 try:
14 async with asyncio.timeout(30):
15 # The client has 30 seconds to send the 2nd request to the server.
16 another_request: Request = yield
17 except TimeoutError:
18 await client.send_packet(TimedOut())
19 else:
20 await client.send_packet(Response())
Warning
Note that this behavior works because the generator is always executed and closed in the same asynchronous task for the current implementation.
This feature is available so that features like anyio.CancelScope
can be used.
However, it may be removed in a future release.
Service Initialization
The server will call service_init()
and pass it an AsyncExitStack
at the beginning of the serve_forever()
task to set up the global service.
This allows you to do something like this:
1async def service_init(
2 self,
3 exit_stack: contextlib.AsyncExitStack,
4 server: AsyncUDPNetworkServer[Request, Response],
5) -> None:
6 exit_stack.callback(self._service_quit)
7
8 self.background_tasks = await exit_stack.enter_async_context(asyncio.TaskGroup())
9
10 _ = self.background_tasks.create_task(self._service_actions())
11
12async def _service_actions(self) -> None:
13 while True:
14 await asyncio.sleep(1)
15
16 # Do some stuff each second in background
17 ...
18
19def _service_quit(self) -> None:
20 print("Service stopped")
Per-client variables (contextvars
integration)
If your asynchronous framework supports per-task context variables, you can use this feature in your request handler:
1class ClientContextRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
2 client_addr_var: ClassVar[contextvars.ContextVar[SocketAddress]]
3 client_addr_var = contextvars.ContextVar("client_addr")
4
5 @classmethod
6 def client_log(cls, message: str) -> None:
7 # The address of the currently handled client can be accessed
8 # without passing it explicitly to this function.
9
10 logger = logging.getLogger(cls.__name__)
11
12 client_address = cls.client_addr_var.get()
13
14 logger.info("From %s: %s", client_address, message)
15
16 async def handle(
17 self,
18 client: AsyncDatagramClient[Response],
19 ) -> AsyncGenerator[None, Request]:
20 address = client.extra(INETClientAttribute.remote_address)
21 self.client_addr_var.set(address)
22
23 # In any code that we call within "handle()" is now possible to get
24 # client's address by calling 'client_addr_var.get()'.
25
26 request: Request = yield
27
28 self.client_log(f"Received request: {request!r}")
29
30 await client.send_packet(Response())
Tip
It is possible to initialize the context to be copied in service_init()
.
This means that the contextvars.ContextVar.set()
calls made in service_init()
will be applied
to subsequent client tasks.
Server Object
A basic example of how to run the server:
1from __future__ import annotations
2
3import asyncio
4from collections.abc import AsyncGenerator
5
6from easynetwork.protocol import DatagramProtocol
7from easynetwork.servers import AsyncUDPNetworkServer
8from easynetwork.servers.handlers import AsyncDatagramClient, AsyncDatagramRequestHandler
9
10
11class Request: ...
12
13
14class Response: ...
15
16
17class MyRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
18 async def handle(
19 self,
20 client: AsyncDatagramClient[Response],
21 ) -> AsyncGenerator[None, Request]:
22 request: Request = yield
23
24 ...
25
26 await client.send_packet(Response())
27
28
29# NOTE: The sent packet is "Response" and the received packet is "Request"
30class ServerProtocol(DatagramProtocol[Response, Request]):
31 def __init__(self) -> None: ...
32
33
34async def main() -> None:
35 host, port = "localhost", 9000
36 protocol = ServerProtocol()
37 handler = MyRequestHandler()
38
39 # Create the server, binding to localhost on port 9000
40 async with AsyncUDPNetworkServer(host, port, protocol, handler) as server:
41 # Activate the server; this will keep running until you
42 # interrupt the program with Ctrl-C
43 await server.serve_forever()
44
45
46if __name__ == "__main__":
47 asyncio.run(main())
See also
- An Echo Client/Server Over UDP
A working example of the server implementation.
Run Server In Background
1from __future__ import annotations
2
3import asyncio
4from collections.abc import AsyncGenerator
5from typing import Any
6
7from easynetwork.clients import AsyncUDPNetworkClient
8from easynetwork.protocol import DatagramProtocol
9from easynetwork.serializers import JSONSerializer
10from easynetwork.servers import AsyncUDPNetworkServer
11from easynetwork.servers.handlers import AsyncDatagramClient, AsyncDatagramRequestHandler
12
13
14class JSONProtocol(DatagramProtocol[dict[str, Any], dict[str, Any]]):
15 def __init__(self) -> None:
16 super().__init__(JSONSerializer())
17
18
19class MyRequestHandler(AsyncDatagramRequestHandler[dict[str, Any], dict[str, Any]]):
20 async def handle(
21 self,
22 client: AsyncDatagramClient[dict[str, Any]],
23 ) -> AsyncGenerator[None, dict[str, Any]]:
24 request: dict[str, Any] = yield
25
26 current_task = asyncio.current_task()
27 assert current_task is not None
28
29 await client.send_packet({"task": current_task.get_name(), "request": request})
30
31
32async def client(host: str, port: int, message: str) -> None:
33 async with AsyncUDPNetworkClient((host, port), JSONProtocol()) as client:
34 await client.send_packet({"message": message})
35 response = await client.recv_packet()
36 print(f"From server: {response}")
37
38
39async def main() -> None:
40 host, port = "localhost", 9000
41 protocol = JSONProtocol()
42 handler = MyRequestHandler()
43
44 server = AsyncUDPNetworkServer(host, port, protocol, handler)
45
46 async with server:
47 is_up_event = asyncio.Event()
48 server_task = asyncio.create_task(server.serve_forever(is_up_event=is_up_event))
49 await is_up_event.wait()
50
51 print(f"Server loop running in task: {server_task.get_name()}")
52
53 await client(host, port, "Hello world 1")
54 await client(host, port, "Hello world 2")
55 await client(host, port, "Hello world 3")
56
57 await server.shutdown()
58
59
60if __name__ == "__main__":
61 asyncio.run(main())
The output of the example should look something like this:
$ python background_server.py
Server loop running in task: Task-2
From server: {'task': 'Task-6', 'request': {'message': 'Hello world 1'}}
From server: {'task': 'Task-7', 'request': {'message': 'Hello world 2'}}
From server: {'task': 'Task-8', 'request': {'message': 'Hello world 3'}}