Alternative — Unix Datagram Client Endpoints

Note

This page uses two different API variants:

  • Synchronous API with classic def functions, usable in any context.

  • Asynchronous API with async def functions, using an asynchronous framework to perform I/O operations.

All asynchronous API examples assume that you are using either asyncio or trio, but you can use a different library thanks to the asynchronous backend engine API.


The Protocol Object

The UNIX datagram clients expect a DatagramProtocol instance to communicate with the peer endpoint.

See also

How-to — Communication Protocols

Explains what a DatagramProtocol is and how to use it.

Connecting To Peer

Important

If you are familiar with UDP sockets, you know that there are no real connections (communication pipes) like there are with TCP sockets.

If not, I advise you to read the Unix manual pages udp(7) and connect(2).

You need the listening socket’s filepath:

 1from __future__ import annotations
 2
 3from easynetwork.clients.unix_datagram import UnixDatagramClient
 4from easynetwork.protocol import DatagramProtocol
 5from easynetwork.serializers import JSONSerializer
 6
 7
 8def main() -> None:
 9    protocol = DatagramProtocol(JSONSerializer())
10    address = "/var/run/app/app.sock"
11
12    with UnixDatagramClient(address, protocol) as client:
13        print(f"Remote address: {client.get_peer_name()}")
14
15        ...
16
17
18if __name__ == "__main__":
19    main()

Note

The client does nothing when it enters the with context. Everything is done on object creation.

Using An Already Connected Socket

If you have your own way to obtain a connected socket.socket instance, you can pass it to the client.

If the socket is not connected, an OSError is raised.

Important

It must be a SOCK_DGRAM socket with AF_UNIX family.

Warning

The resource ownership is given to the client. You must close the client to close the socket.

 1from __future__ import annotations
 2
 3import socket
 4import tempfile
 5
 6from easynetwork.clients.unix_datagram import UnixDatagramClient
 7from easynetwork.protocol import DatagramProtocol
 8from easynetwork.serializers import JSONSerializer
 9
10
11def obtain_a_connected_socket() -> socket.socket:
12    sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
13    sock.bind(f"{tempfile.mkdtemp()}/local.sock")
14
15    ...
16
17    return sock
18
19
20def main() -> None:
21    protocol = DatagramProtocol(JSONSerializer())
22    sock = obtain_a_connected_socket()
23
24    with UnixDatagramClient(sock, protocol) as client:
25        print(f"Remote address: {client.get_peer_name()}")
26
27        ...
28
29
30if __name__ == "__main__":
31    main()

Sending Packets

Objects passed as arguments are automatically converted to bytes to send to the remote host thanks to the protocol object.

1client.send_packet({"data": 42})

Sending Packets with socket control messages

By using SocketAncillary, you can send SCM data. See the Unix manual page sendmsg(2) for details.

1from easynetwork.lowlevel.socket import SocketAncillary
2
3ancillary = SocketAncillary()
4ancillary.add_fds([4])
5client.send_packet({"data": 42}, ancillary_data=ancillary)

Receiving Packets

You get the next available packet, already parsed.

1packet = client.recv_packet()
2print(f"Received packet: {packet!r}")

You can control the receive timeout with the timeout parameter:

1try:
2    packet = client.recv_packet(timeout=30)
3except TimeoutError:
4    print("Timed out")
5else:
6    print(f"Received packet: {packet!r}")

Tip

Remember to catch invalid data parsing errors.

1try:
2    packet = client.recv_packet(timeout=30)
3except DatagramProtocolParseError:
4    print("Received something, but was not valid")
5except TimeoutError:
6    print("Timed out")
7else:
8    print(f"Received packet: {packet!r}")

Receiving Packets with socket control messages

By using SocketAncillary, you can receive SCM data. See the Unix manual page recvmsg(2) for details.

 1from easynetwork.lowlevel.socket import SCMCredentials, SCMRights, SocketAncillary
 2
 3ancillary = SocketAncillary()
 4packet = client.recv_packet(ancillary_data=ancillary)
 5print(f"Received packet: {packet!r}")
 6for message in ancillary.messages():
 7    match message:
 8        case SCMRights(fds):
 9            for fd in fds:
10                print(f"Received file descriptor: {fd}")
11        case SCMCredentials(credentials):
12            for ucred in credentials:
13                print(f"Received unix credential: {ucred}")

Tip

The default buffer size for this operation is approximately 8 KiB. However, you can customize this behavior.

 1from socket import CMSG_LEN
 2
 3from easynetwork.lowlevel.socket import SocketAncillary
 4
 5max_fds = 128
 6ancillary = SocketAncillary()
 7packet = client.recv_packet(ancillary_data=ancillary, ancillary_bufsize=CMSG_LEN(max_fds * 4))
 8print(f"Received packet: {packet!r}")
 9for fd in ancillary.iter_fds():
10    print(f"Received file descriptor: {fd}")

Receiving Multiple Packets At Once

You can use iter_received_packets() to get all the received packets in a sequence or a set.

1all_packets = [p for p in client.iter_received_packets()]

The timeout parameter defaults to zero to get only the data already in the buffer, but you can change it.

1all_packets = [p for p in client.iter_received_packets(timeout=1)]

See also

UnixDatagramClient.iter_received_packets()

The method description and usage (especially for the timeout parameter).

Low-Level Socket Operations

For low-level operations such as setsockopt(), the client object exposes the socket through a SocketProxy:

1client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_DEBUG, True)

Concurrency And Multithreading

All client methods are thread-safe. Synchronization follows these rules: