How-to — UDP Servers

Note

This page uses two different API variants:

  • Synchronous API with classic def functions, usable in any context.

  • Asynchronous API with async def functions, using an asynchronous framework to perform I/O operations.

All asynchronous API examples assume that you are using asyncio, but you can use a different library thanks to the asynchronous backend engine API.


Introduction

Creating a UDP server requires several steps:

  1. Derive a class from AsyncDatagramRequestHandler and redefine its handle() method; this method will process incoming requests.

  2. Instantiate the AsyncUDPNetworkServer class passing it the server’s address, the protocol object and the request handler instance.

  3. Call serve_forever() to process requests.

Request Handler Objects

Note

Unlike socketserver.BaseRequestHandler, there is only one AsyncDatagramRequestHandler instance for the entire service.

Here is a simple example:

 1from __future__ import annotations
 2
 3from collections.abc import AsyncGenerator
 4
 5from easynetwork.api_async.server import AsyncDatagramClient, AsyncDatagramRequestHandler
 6
 7
 8class Request:
 9    """Object representing the client request."""
10
11    ...
12
13
14class Response:
15    """Object representing the response to send to the client."""
16
17    ...
18
19
20class MyRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
21    """
22    The request handler class for our server.
23
24    It is instantiated once to the server, and must
25    override the handle() method to implement communication to the
26    client.
27    """
28
29    async def handle(
30        self,
31        client: AsyncDatagramClient[Response],
32    ) -> AsyncGenerator[None, Request]:
33        # "client" a placeholder to have a stream-like API.
34        # All the datagrams sent by this client are sent
35        # through the "yield" statement.
36        request: Request = yield
37
38        # Do some stuff
39        ...
40
41        response = Response()
42
43        # The corresponding call is server_socket.sendto(data, remote_address)
44        await client.send_packet(response)

Using handle() Generator

Important

There will always be only one active generator per client. All the pending datagrams received while the generator is running are queued.

This behavior is designed to act like a stream request handler.

Minimum Requirements

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    ### Before 'yield'
 6    # Initializes the generator.
 7    # This is the setup part before receiving a request.
 8    # Unlike the stream request handler, the generator is started
 9    # when the datagram is received (but is not parsed yet).
10    ##################
11
12    request: Request = yield
13
14    ### After 'yield'
15    # The received datagram is parsed.
16    # you can do whatever you want with it and send responses back
17    # to the client if necessary.
18    await client.send_packet(Response())
19    #################
20
21    ### On a 'return'
22    # When handle() returns, it means that this request handler is finished.
23    # The server creates a new generator when a new datagram is received.
24    #################
25    return

Refuse datagrams

Your UDP socket can receive datagrams from anywhere. You may want to control who can send you information.

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    if not self.should_handle(client):
 6        # By returning before the "yield" statement, you ask the server to discard
 7        # the received datagram.
 8        return
 9
10    request: Request = yield

Error Handling

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    try:
 6        # *All* exceptions are thrown through the "yield" statement
 7        # (including BaseException). But you should only catch Exception subclasses.
 8        request: Request = yield
 9    except DatagramProtocolParseError:
10        await client.send_packet(BadRequest())
11    except Exception:
12        await client.send_packet(InternalError())
13    else:
14        await client.send_packet(Response())

Having Multiple yield Statements

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    ...
 8
 9    await client.send_packet(Response())
10
11    if self.need_something_else(request, client):
12        additional_data: Request = yield
13
14        ...
15
16        await client.send_packet(Response())

Warning

Even if this feature is supported, it is not recommended to have more than one (unless you know what you are doing) for the following reasons:

  • UDP does not guarantee ordered delivery. Packets are typically “sent” in order, but they may be received out of order. In large networks, it is reasonably common for some packets to arrive out of sequence (or not at all).

  • The server has no way of knowing if this client has stopped sending you requests forever.

If you plan to use multiple yields in your request handler, you should always have a timeout applied. (See the section below.)

Cancellation And Timeouts

Since all BaseException subclasses are thrown into the generator, you can apply a timeout to the read stream using the asynchronous framework (the cancellation exception is retrieved in the generator):

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    # It is *never* useful to have a timeout for the 1st datagram because the datagram
 6    # is already in the queue.
 7    request: Request = yield
 8
 9    ...
10
11    await client.send_packet(Response())
12
13    try:
14        async with asyncio.timeout(30):
15            # The client has 30 seconds to send the 2nd request to the server.
16            another_request: Request = yield
17    except TimeoutError:
18        await client.send_packet(TimedOut())
19    else:
20        await client.send_packet(Response())

Service Initialization

The server will call service_init() and pass it an AsyncExitStack at the beginning of the serve_forever() task to set up the global service.

This allows you to do something like this:

 1async def service_init(
 2    self,
 3    exit_stack: contextlib.AsyncExitStack,
 4    server: AsyncUDPNetworkServer[Request, Response],
 5) -> None:
 6    exit_stack.callback(self._service_quit)
 7
 8    self.background_tasks = await exit_stack.enter_async_context(asyncio.TaskGroup())
 9
10    _ = self.background_tasks.create_task(self._service_actions())
11
12async def _service_actions(self) -> None:
13    while True:
14        await asyncio.sleep(1)
15
16        # Do some stuff each second in background
17        ...
18
19def _service_quit(self) -> None:
20    print("Service stopped")

Server Object

A basic example of how to run the server:

 1from __future__ import annotations
 2
 3import asyncio
 4from collections.abc import AsyncGenerator
 5
 6from easynetwork.api_async.server import AsyncDatagramClient, AsyncDatagramRequestHandler, AsyncUDPNetworkServer
 7from easynetwork.protocol import DatagramProtocol
 8
 9
10class Request:
11    ...
12
13
14class Response:
15    ...
16
17
18class MyRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
19    async def handle(
20        self,
21        client: AsyncDatagramClient[Response],
22    ) -> AsyncGenerator[None, Request]:
23        request: Request = yield
24
25        ...
26
27        await client.send_packet(Response())
28
29
30# NOTE: The sent packet is "Response" and the received packet is "Request"
31class ServerProtocol(DatagramProtocol[Response, Request]):
32    def __init__(self) -> None:
33        ...
34
35
36async def main() -> None:
37    host, port = "localhost", 9000
38    protocol = ServerProtocol()
39    handler = MyRequestHandler()
40
41    # Create the server, binding to localhost on port 9000
42    async with AsyncUDPNetworkServer(host, port, protocol, handler) as server:
43        # Activate the server; this will keep running until you
44        # interrupt the program with Ctrl-C
45        await server.serve_forever()
46
47
48if __name__ == "__main__":
49    asyncio.run(main())

See also

An Echo Client/Server Over UDP

A working example of the server implementation.