Alternative — Unix Datagram Servers

Note

This page uses two different API variants:

  • Synchronous API with classic def functions, usable in any context.

  • Asynchronous API with async def functions, using an asynchronous framework to perform I/O operations.

All asynchronous API examples assume that you are using either asyncio or trio, but you can use a different library thanks to the asynchronous backend engine API.


Introduction

Creating a Unix datagram server requires several steps:

  1. Derive a class from AsyncDatagramRequestHandler and redefine its handle() method; this method will process incoming requests.

  2. Instantiate the AsyncUnixDatagramServer class passing it the server’s address, the protocol object and the request handler instance.

  3. Call serve_forever() to process requests.

Writing coroutine functions is mandatory to use this server.

See also

PEP 492 — Coroutines with async and await syntax

The proposal to introduce native coroutines in Python with async and await syntax.

asyncio — Asynchronous I/O

If you are not familiar with async/await syntax, you can use the standard library to get started with coroutines.

Request Handler Objects

Note

Unlike socketserver.BaseRequestHandler, there is only one AsyncDatagramRequestHandler instance for the entire service.

Here is a simple example:

 1from __future__ import annotations
 2
 3from collections.abc import AsyncGenerator
 4
 5from easynetwork.servers.handlers import AsyncDatagramClient, AsyncDatagramRequestHandler
 6
 7
 8class Request:
 9    """Object representing the client request."""
10
11    ...
12
13
14class Response:
15    """Object representing the response to send to the client."""
16
17    ...
18
19
20class MyRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
21    """
22    The request handler class for our server.
23
24    It is instantiated once to the server, and must
25    override the handle() method to implement communication to the
26    client.
27    """
28
29    async def handle(
30        self,
31        client: AsyncDatagramClient[Response],
32    ) -> AsyncGenerator[None, Request]:
33        # "client" a placeholder to have a stream-like API.
34        # All the datagrams sent by this client are sent
35        # through the "yield" statement.
36        request: Request = yield
37
38        # Do some stuff
39        ...
40
41        response = Response()
42
43        # The corresponding call is server_socket.sendto(data, remote_address)
44        await client.send_packet(response)

Using handle() Generator

Important

There will always be only one active generator per client. All the pending datagrams received while the generator is running are queued.

This behavior is designed to act like a stream request handler.

Minimum Requirements

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    ### Before 'yield'
 6    # Initializes the generator.
 7    # This is the setup part before receiving a request.
 8    # Unlike the stream request handler, the generator is started
 9    # when the datagram is received (but is not parsed yet).
10    ##################
11
12    request: Request = yield
13
14    ### After 'yield'
15    # The received datagram is parsed.
16    # you can do whatever you want with it and send responses back
17    # to the client if necessary.
18    await client.send_packet(Response())
19    #################
20
21    ### On a 'return'
22    # When handle() returns, it means that this request handler is finished.
23    # The server creates a new generator when a new datagram is received.
24    #################
25    return

Refuse datagrams

Your UDP socket can receive datagrams from anyone with permission to send them. You may want to control who can send you information.

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    if not self.should_handle(client):
 6        # By returning before the "yield" statement, you ask the server to discard
 7        # the received datagram.
 8        return
 9
10    request: Request = yield

Error Handling

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    try:
 6        # *All* exceptions are thrown through the "yield" statement
 7        # (including BaseException). But you should only catch Exception subclasses.
 8        request: Request = yield
 9    except DatagramProtocolParseError:
10        await client.send_packet(BadRequest())
11    except Exception:
12        # Runtime error. Log the error.
13        traceback.print_exc()
14
15        await client.send_packet(InternalError())
16    else:
17        await client.send_packet(Response())

Warning

You should always log or re-raise a bare Exception thrown in your generator.

1except Exception:
2    # Runtime error. Log the error.
3    traceback.print_exc()
4
5    await client.send_packet(InternalError())

Having Multiple yield Statements

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    ...
 8
 9    await client.send_packet(Response())
10
11    if self.need_something_else(request, client):
12        additional_data: Request = yield
13
14        ...
15
16        await client.send_packet(Response())

Warning

Even if this feature is supported, it is not recommended to have more than one (unless you know what you are doing) for the following reasons:

  • UDP does not guarantee ordered delivery. Packets are typically “sent” in order, but they may be received out of order. In large networks, it is reasonably common for some packets to arrive out of sequence (or not at all).

  • The server has no way of knowing if this client has stopped sending you requests forever.

If you plan to use multiple yields in your request handler, you should always have a timeout applied. (See the section below.)

Cancellation And Timeouts

It is possible to send the timeout delay to the parent task:

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[RecvParams | None, Request]:
 5    # It is *never* useful to have a timeout for the 1st datagram
 6    # because the datagram is already in the queue.
 7    # The yielded value is simply ignored.
 8    request: Request = yield None
 9
10    ...
11
12    await client.send_packet(Response())
13
14    try:
15        # The client has 30 seconds to send the 2nd request to the server.
16        another_request: Request = yield RecvParams(timeout=30.0)
17    except TimeoutError:
18        await client.send_packet(TimedOut())
19    else:
20        await client.send_packet(Response())

Deprecated since version 1.2: Yielding a timeout without using RecvParams. Will be removed in 2.0.

1try:
2    # The client has 30 seconds to send the 2nd request to the server.
3    another_request: Request = yield 30
4except TimeoutError:
5    await client.send_packet(TimedOut())
6else:
7    await client.send_packet(Response())

Sending Packets with socket control messages

By using SocketAncillary, you can send SCM data. See the Unix manual page sendmsg(2) for details.

1async def handle(
2    self,
3    client: AsyncDatagramClient[Response],
4) -> AsyncGenerator[None, Request]:
5    request: Request = yield
6
7    ancillary = SocketAncillary()
8    ancillary.add_fds([4])
9    await client.send_packet_with_ancillary(Response(), ancillary)

Receiving Packets with socket control messages

By using RecvAncillaryDataParams and SocketAncillary, you can receive SCM data. See the Unix manual page recvmsg(2) for details.

Warning

You must enable the feature in the server configuration to make this work.

1server = AsyncUnixDatagramServer(
2    "/var/run/app.sock",
3    DatagramProtocol(JSONSerializer()),
4    SCMRecvRequestHandler(),
5    receive_ancillary_data=True,
6)
 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[RecvParams | None, Request]:
 5    ancillary = SocketAncillary()
 6    request: Request = yield RecvParams(recv_with_ancillary=RecvAncillaryDataParams(ancillary.update_from_raw))
 7
 8    for message in ancillary.messages():
 9        match message:
10            case SCMRights(fds):
11                for fd in fds:
12                    print(f"Received file descriptor: {fd}")
13            case SCMCredentials(credentials):
14                for ucred in credentials:
15                    print(f"Received unix credential: {ucred}")

Tip

The default buffer size for this operation is approximately 8 KiB. However, you can customize this behavior.

 1from socket import CMSG_LEN
 2
 3max_fds = 128
 4server = AsyncUnixDatagramServer(
 5    "/var/run/app.sock",
 6    DatagramProtocol(JSONSerializer()),
 7    SCMRecvRequestHandler(),
 8    receive_ancillary_data=True,
 9    ancillary_bufsize=CMSG_LEN(max_fds * 4),
10)

Client Metadata

The client’s metadata are available via UNIXClientAttribute:

 1async def handle(
 2    self,
 3    client: AsyncDatagramClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    client_address = client.extra(UNIXClientAttribute.peer_name)
 6
 7    request: Request = yield
 8
 9    print(f"{client_address} sent {request}")
10
11    await client.send_packet(Response())

Service Initialization

The server will call service_init() and pass it an AsyncExitStack at the beginning of the serve_forever() task to set up the global service.

This allows you to do something like this:

 1async def service_init(
 2    self,
 3    exit_stack: contextlib.AsyncExitStack,
 4    server: AsyncUnixDatagramServer[Request, Response],
 5) -> None:
 6    exit_stack.callback(self._service_quit)
 7
 8    self.background_tasks = await exit_stack.enter_async_context(asyncio.TaskGroup())
 9
10    _ = self.background_tasks.create_task(self._service_actions())
11
12async def _service_actions(self) -> None:
13    while True:
14        await asyncio.sleep(1)
15
16        # Do some stuff each second in background
17        ...
18
19def _service_quit(self) -> None:
20    print("Service stopped")

Low-Level Socket Operations

For low-level operations such as setsockopt(), the server object exposes the sockets through a SocketProxy:

1async def service_init(
2    self,
3    exit_stack: contextlib.AsyncExitStack,
4    server: AsyncUnixDatagramServer[Request, Response],
5) -> None:
6    for sock in server.get_sockets():
7        # Enable SO_PASSCRED in order to use SCMCredentials
8        sock.setsockopt(SOL_SOCKET, SO_PASSCRED, 1)

Per-client variables (contextvars integration)

If your asynchronous framework supports per-task context variables, you can use this feature in your request handler:

 1class ClientContextRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
 2    client_addr_var: ClassVar[contextvars.ContextVar[UnixSocketAddress]]
 3    client_addr_var = contextvars.ContextVar("client_addr")
 4
 5    @classmethod
 6    def client_log(cls, message: str) -> None:
 7        # The address of the currently handled client can be accessed
 8        # without passing it explicitly to this function.
 9
10        logger = logging.getLogger(cls.__name__)
11
12        client_address = cls.client_addr_var.get()
13
14        logger.info("From %s: %s", client_address, message)
15
16    async def handle(
17        self,
18        client: AsyncDatagramClient[Response],
19    ) -> AsyncGenerator[None, Request]:
20        address = client.extra(UNIXClientAttribute.peer_name)
21        self.client_addr_var.set(address)
22
23        # In any code that we call within "handle()" is now possible to get
24        # client's address by calling 'client_addr_var.get()'.
25
26        request: Request = yield
27
28        self.client_log(f"Received request: {request!r}")
29
30        await client.send_packet(Response())

Tip

It is possible to initialize the context to be copied in service_init().

This means that the contextvars.ContextVar.set() calls made in service_init() will be applied to subsequent client tasks.

Server Object

A basic example of how to run the server:

 1from __future__ import annotations
 2
 3import asyncio
 4from collections.abc import AsyncGenerator
 5
 6from easynetwork.protocol import DatagramProtocol
 7from easynetwork.servers.async_unix_datagram import AsyncUnixDatagramServer
 8from easynetwork.servers.handlers import AsyncDatagramClient, AsyncDatagramRequestHandler
 9
10
11class Request: ...
12
13
14class Response: ...
15
16
17class MyRequestHandler(AsyncDatagramRequestHandler[Request, Response]):
18    async def handle(
19        self,
20        client: AsyncDatagramClient[Response],
21    ) -> AsyncGenerator[None, Request]:
22        request: Request = yield
23
24        ...
25
26        await client.send_packet(Response())
27
28
29# NOTE: The sent packet is "Response" and the received packet is "Request"
30class ServerProtocol(DatagramProtocol[Response, Request]):
31    def __init__(self) -> None: ...
32
33
34async def main() -> None:
35    path = "/var/run/app/app.sock"
36    protocol = ServerProtocol()
37    handler = MyRequestHandler()
38
39    # Create the server, binding to /var/run/app/app.sock
40    async with AsyncUnixDatagramServer(path, protocol, handler) as server:
41        # Activate the server; this will keep running until you
42        # interrupt the program with Ctrl-C
43        await server.serve_forever()
44
45
46if __name__ == "__main__":
47    asyncio.run(main())