How-to — UDP Servers

Note

This page uses two different API variants:

  • Synchronous API with classic def functions, usable in any context.

  • Asynchronous API with async def functions, using an asynchronous framework to perform I/O operations.

All asynchronous API examples assume that you are using asyncio, but you can use a different library thanks to the asynchronous backend engine API.


Introduction

Creating a UDP server requires several steps:

  1. Derive a class from AsyncDatagramRequestHandler and redefine its handle() method; this method will process incoming requests.

  2. Instantiate the AsyncUDPNetworkServer class passing it the server’s address, the protocol object and the request handler instance.

  3. Call serve_forever() to process requests.

Request Handler Objects

Note

Unlike socketserver.BaseRequestHandler, there is only one AsyncDatagramRequestHandler instance for the entire service.

Here is a simple example:

 1from __future__ import annotations
 2
 3from collections.abc import AsyncGenerator
 4
 5from easynetwork.servers.handlers import AsyncDatagramClient, AsyncDatagramRequestHandler
 6
 7
 8class Request:
 9    """Object representing the client request."""
10
11    ...
12
13
14class Response:
15    """Object representing the response to send to the client."""
16
17    ...
18
19
20class MyRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
21    """
22    The request handler class for our server.
23
24    It is instantiated once to the server, and must
25    override the handle() method to implement communication to the
26    client.
27    """
28
29    async def handle(
30        self,
31        client: AsyncDatagramClient[Response],
32    ) -> AsyncGenerator[None, Request]:
33        # "client" a placeholder to have a stream-like API.
34        # All the datagrams sent by this client are sent
35        # through the "yield" statement.
36        request: Request = yield
37
38        # Do some stuff
39        ...
40
41        response = Response()
42
43        # The corresponding call is server_socket.sendto(data, remote_address)
44        await client.send_packet(response)

Using handle() Generator

Important

There will always be only one active generator per client. All the pending datagrams received while the generator is running are queued.

This behavior is designed to act like a stream request handler.

Minimum Requirements

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    ### Before 'yield'
 6    # Initializes the generator.
 7    # This is the setup part before receiving a request.
 8    # Unlike the stream request handler, the generator is started
 9    # when the datagram is received (but is not parsed yet).
10    ##################
11
12    request: Request = yield
13
14    ### After 'yield'
15    # The received datagram is parsed.
16    # you can do whatever you want with it and send responses back
17    # to the client if necessary.
18    await client.send_packet(Response())
19    #################
20
21    ### On a 'return'
22    # When handle() returns, it means that this request handler is finished.
23    # The server creates a new generator when a new datagram is received.
24    #################
25    return

Refuse datagrams

Your UDP socket can receive datagrams from anywhere. You may want to control who can send you information.

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    if not self.should_handle(client):
 6        # By returning before the "yield" statement, you ask the server to discard
 7        # the received datagram.
 8        return
 9
10    request: Request = yield

Error Handling

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    try:
 6        # *All* exceptions are thrown through the "yield" statement
 7        # (including BaseException). But you should only catch Exception subclasses.
 8        request: Request = yield
 9    except DatagramProtocolParseError:
10        await client.send_packet(BadRequest())
11    except Exception:
12        # Most likely a bug in EasyNetwork code. Log the error.
13        traceback.print_exc()
14
15        await client.send_packet(InternalError())
16    else:
17        await client.send_packet(Response())

Warning

You should always log or re-raise a bare Exception thrown in your generator.

1except Exception:
2    # Most likely a bug in EasyNetwork code. Log the error.
3    traceback.print_exc()
4
5    await client.send_packet(InternalError())

Having Multiple yield Statements

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    ...
 8
 9    await client.send_packet(Response())
10
11    if self.need_something_else(request, client):
12        additional_data: Request = yield
13
14        ...
15
16        await client.send_packet(Response())

Warning

Even if this feature is supported, it is not recommended to have more than one (unless you know what you are doing) for the following reasons:

  • UDP does not guarantee ordered delivery. Packets are typically “sent” in order, but they may be received out of order. In large networks, it is reasonably common for some packets to arrive out of sequence (or not at all).

  • The server has no way of knowing if this client has stopped sending you requests forever.

If you plan to use multiple yields in your request handler, you should always have a timeout applied. (See the section below.)

Cancellation And Timeouts

It is possible to send the timeout delay to the parent task:

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[float | None, Request]:
 5    # It is *never* useful to have a timeout for the 1st datagram
 6    # because the datagram is already in the queue.
 7    request: Request = yield None
 8
 9    ...
10
11    await client.send_packet(Response())
12
13    try:
14        # The client has 30 seconds to send the 2nd request to the server.
15        another_request: Request = yield 30
16    except TimeoutError:
17        await client.send_packet(TimedOut())
18    else:
19        await client.send_packet(Response())

Service Initialization

The server will call service_init() and pass it an AsyncExitStack at the beginning of the serve_forever() task to set up the global service.

This allows you to do something like this:

 1async def service_init(
 2    self,
 3    exit_stack: contextlib.AsyncExitStack,
 4    server: AsyncUDPNetworkServer[Request, Response],
 5) -> None:
 6    exit_stack.callback(self._service_quit)
 7
 8    self.background_tasks = await exit_stack.enter_async_context(asyncio.TaskGroup())
 9
10    _ = self.background_tasks.create_task(self._service_actions())
11
12async def _service_actions(self) -> None:
13    while True:
14        await asyncio.sleep(1)
15
16        # Do some stuff each second in background
17        ...
18
19def _service_quit(self) -> None:
20    print("Service stopped")

Per-client variables (contextvars integration)

If your asynchronous framework supports per-task context variables, you can use this feature in your request handler:

 1class ClientContextRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
 2    client_addr_var: ClassVar[contextvars.ContextVar[SocketAddress]]
 3    client_addr_var = contextvars.ContextVar("client_addr")
 4
 5    @classmethod
 6    def client_log(cls, message: str) -> None:
 7        # The address of the currently handled client can be accessed
 8        # without passing it explicitly to this function.
 9
10        logger = logging.getLogger(cls.__name__)
11
12        client_address = cls.client_addr_var.get()
13
14        logger.info("From %s: %s", client_address, message)
15
16    async def handle(
17        self,
18        client: AsyncDatagramClient[Response],
19    ) -> AsyncGenerator[None, Request]:
20        address = client.extra(INETClientAttribute.remote_address)
21        self.client_addr_var.set(address)
22
23        # In any code that we call within "handle()" is now possible to get
24        # client's address by calling 'client_addr_var.get()'.
25
26        request: Request = yield
27
28        self.client_log(f"Received request: {request!r}")
29
30        await client.send_packet(Response())

Tip

It is possible to initialize the context to be copied in service_init().

This means that the contextvars.ContextVar.set() calls made in service_init() will be applied to subsequent client tasks.

Server Object

A basic example of how to run the server:

 1from __future__ import annotations
 2
 3import asyncio
 4from collections.abc import AsyncGenerator
 5
 6from easynetwork.protocol import DatagramProtocol
 7from easynetwork.servers import AsyncUDPNetworkServer
 8from easynetwork.servers.handlers import AsyncDatagramClient, AsyncDatagramRequestHandler
 9
10
11class Request: ...
12
13
14class Response: ...
15
16
17class MyRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
18    async def handle(
19        self,
20        client: AsyncDatagramClient[Response],
21    ) -> AsyncGenerator[None, Request]:
22        request: Request = yield
23
24        ...
25
26        await client.send_packet(Response())
27
28
29# NOTE: The sent packet is "Response" and the received packet is "Request"
30class ServerProtocol(DatagramProtocol[Response, Request]):
31    def __init__(self) -> None: ...
32
33
34async def main() -> None:
35    host, port = "localhost", 9000
36    protocol = ServerProtocol()
37    handler = MyRequestHandler()
38
39    # Create the server, binding to localhost on port 9000
40    async with AsyncUDPNetworkServer(host, port, protocol, handler) as server:
41        # Activate the server; this will keep running until you
42        # interrupt the program with Ctrl-C
43        await server.serve_forever()
44
45
46if __name__ == "__main__":
47    asyncio.run(main())

See also

An Echo Client/Server Over UDP

A working example of the server implementation.

Run Server In Background

 1from __future__ import annotations
 2
 3import asyncio
 4from collections.abc import AsyncGenerator
 5from typing import Any
 6
 7from easynetwork.clients import AsyncUDPNetworkClient
 8from easynetwork.protocol import DatagramProtocol
 9from easynetwork.serializers import JSONSerializer
10from easynetwork.servers import AsyncUDPNetworkServer
11from easynetwork.servers.handlers import AsyncDatagramClient, AsyncDatagramRequestHandler
12
13
14class JSONProtocol(DatagramProtocol[dict[str, Any], dict[str, Any]]):
15    def __init__(self) -> None:
16        super().__init__(JSONSerializer())
17
18
19class MyRequestHandler(AsyncDatagramRequestHandler[dict[str, Any], dict[str, Any]]):
20    async def handle(
21        self,
22        client: AsyncDatagramClient[dict[str, Any]],
23    ) -> AsyncGenerator[None, dict[str, Any]]:
24        request: dict[str, Any] = yield
25
26        current_task = asyncio.current_task()
27        assert current_task is not None
28
29        await client.send_packet({"task": current_task.get_name(), "request": request})
30
31
32async def client(host: str, port: int, message: str) -> None:
33    async with AsyncUDPNetworkClient((host, port), JSONProtocol()) as client:
34        await client.send_packet({"message": message})
35        response = await client.recv_packet()
36        print(f"From server: {response}")
37
38
39async def main() -> None:
40    host, port = "localhost", 9000
41    protocol = JSONProtocol()
42    handler = MyRequestHandler()
43
44    server = AsyncUDPNetworkServer(host, port, protocol, handler)
45
46    async with server:
47        is_up_event = asyncio.Event()
48        server_task = asyncio.create_task(server.serve_forever(is_up_event=is_up_event))
49        await is_up_event.wait()
50
51        print(f"Server loop running in task: {server_task.get_name()}")
52
53        await client(host, port, "Hello world 1")
54        await client(host, port, "Hello world 2")
55        await client(host, port, "Hello world 3")
56
57        await server.shutdown()
58
59
60if __name__ == "__main__":
61    asyncio.run(main())

The output of the example should look something like this:

$ python background_server.py
Server loop running in task: Task-2
From server: {'task': 'Task-6', 'request': {'message': 'Hello world 1'}}
From server: {'task': 'Task-7', 'request': {'message': 'Hello world 2'}}
From server: {'task': 'Task-8', 'request': {'message': 'Hello world 3'}}