Alternative — Unix Datagram Client Endpoints
Note
This page uses two different API variants:
Synchronous API with classic
deffunctions, usable in any context.Asynchronous API with
async deffunctions, using an asynchronous framework to perform I/O operations.
All asynchronous API examples assume that you are using either asyncio or trio,
but you can use a different library thanks to the asynchronous backend engine API.
The Protocol Object
The UNIX datagram clients expect a DatagramProtocol instance to communicate with the peer endpoint.
See also
- How-to — Communication Protocols
Explains what a
DatagramProtocolis and how to use it.
Connecting To Peer
Important
If you are familiar with UDP sockets, you know that there are no real connections (communication pipes) like there are with TCP sockets.
If not, I advise you to read the Unix manual pages udp(7) and connect(2).
You need the listening socket’s filepath:
1from __future__ import annotations
2
3from easynetwork.clients.unix_datagram import UnixDatagramClient
4from easynetwork.protocol import DatagramProtocol
5from easynetwork.serializers import JSONSerializer
6
7
8def main() -> None:
9 protocol = DatagramProtocol(JSONSerializer())
10 address = "/var/run/app/app.sock"
11
12 with UnixDatagramClient(address, protocol) as client:
13 print(f"Remote address: {client.get_peer_name()}")
14
15 ...
16
17
18if __name__ == "__main__":
19 main()
Note
The client does nothing when it enters the with context. Everything is done on object creation.
1from __future__ import annotations
2
3import asyncio
4
5from easynetwork.clients.async_unix_datagram import AsyncUnixDatagramClient
6from easynetwork.protocol import DatagramProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10async def main() -> None:
11 protocol = DatagramProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 async with AsyncUnixDatagramClient(address, protocol) as client:
15 print(f"Remote address: {client.get_peer_name()}")
16
17 ...
18
19
20if __name__ == "__main__":
21 asyncio.run(main())
Note
The call to wait_connected() is required to actually initialize the client,
since we cannot perform asynchronous operations at object creation.
This is what the client does when it enters the the async with context.
Once completed, wait_connected() is a no-op.
Using An Already Connected Socket
If you have your own way to obtain a connected socket.socket instance, you can pass it to the client.
If the socket is not connected, an OSError is raised.
Important
It must be a SOCK_DGRAM socket with AF_UNIX family.
Warning
The resource ownership is given to the client. You must close the client to close the socket.
1from __future__ import annotations
2
3import socket
4import tempfile
5
6from easynetwork.clients.unix_datagram import UnixDatagramClient
7from easynetwork.protocol import DatagramProtocol
8from easynetwork.serializers import JSONSerializer
9
10
11def obtain_a_connected_socket() -> socket.socket:
12 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
13 sock.bind(f"{tempfile.mkdtemp()}/local.sock")
14
15 ...
16
17 return sock
18
19
20def main() -> None:
21 protocol = DatagramProtocol(JSONSerializer())
22 sock = obtain_a_connected_socket()
23
24 with UnixDatagramClient(sock, protocol) as client:
25 print(f"Remote address: {client.get_peer_name()}")
26
27 ...
28
29
30if __name__ == "__main__":
31 main()
1from __future__ import annotations
2
3import asyncio
4import socket
5import tempfile
6
7from easynetwork.clients.async_unix_datagram import AsyncUnixDatagramClient
8from easynetwork.protocol import DatagramProtocol
9from easynetwork.serializers import JSONSerializer
10
11
12async def obtain_a_connected_socket() -> socket.socket:
13 sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
14 sock.bind(f"{tempfile.mkdtemp()}/local.sock")
15
16 ...
17
18 return sock
19
20
21async def main() -> None:
22 protocol = DatagramProtocol(JSONSerializer())
23 sock = await obtain_a_connected_socket()
24
25 async with AsyncUnixDatagramClient(sock, protocol) as client:
26 print(f"Remote address: {client.get_peer_name()}")
27
28 ...
29
30
31if __name__ == "__main__":
32 asyncio.run(main())
Note
Even with a ready-to-use socket, the call to wait_connected() is still required.
Sending Packets
Objects passed as arguments are automatically converted to bytes to send to the remote host thanks to the protocol object.
1client.send_packet({"data": 42})
1await client.send_packet({"data": 42})
Sending Packets with socket control messages
By using SocketAncillary, you can send SCM data. See the Unix manual page sendmsg(2) for details.
1from easynetwork.lowlevel.socket import SocketAncillary
2
3ancillary = SocketAncillary()
4ancillary.add_fds([4])
5client.send_packet({"data": 42}, ancillary_data=ancillary)
1from easynetwork.lowlevel.socket import SocketAncillary
2
3ancillary = SocketAncillary()
4ancillary.add_fds([4])
5await client.send_packet({"data": 42}, ancillary_data=ancillary)
Receiving Packets
You get the next available packet, already parsed.
1packet = client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout with the timeout parameter:
1try:
2 packet = client.recv_packet(timeout=30)
3except TimeoutError:
4 print("Timed out")
5else:
6 print(f"Received packet: {packet!r}")
1packet = await client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout by adding a timeout scope using the asynchronous framework :
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except TimeoutError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
1try:
2 with trio.fail_after(30):
3 packet = await client.recv_packet()
4except trio.TooSlowError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
1try:
2 with client.backend().timeout(30):
3 packet = await client.recv_packet()
4except TimeoutError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
Tip
Remember to catch invalid data parsing errors.
1try:
2 packet = client.recv_packet(timeout=30)
3except DatagramProtocolParseError:
4 print("Received something, but was not valid")
5except TimeoutError:
6 print("Timed out")
7else:
8 print(f"Received packet: {packet!r}")
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except DatagramProtocolParseError:
5 print("Received something, but was not valid")
6except TimeoutError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
1try:
2 with trio.fail_after(30):
3 packet = await client.recv_packet()
4except DatagramProtocolParseError:
5 print("Received something, but was not valid")
6except trio.TooSlowError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
1try:
2 with client.backend().timeout(30):
3 packet = await client.recv_packet()
4except DatagramProtocolParseError:
5 print("Received something, but was not valid")
6except TimeoutError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
Receiving Packets with socket control messages
By using SocketAncillary, you can receive SCM data. See the Unix manual page recvmsg(2) for details.
1from easynetwork.lowlevel.socket import SCMCredentials, SCMRights, SocketAncillary
2
3ancillary = SocketAncillary()
4packet = client.recv_packet(ancillary_data=ancillary)
5print(f"Received packet: {packet!r}")
6for message in ancillary.messages():
7 match message:
8 case SCMRights(fds):
9 for fd in fds:
10 print(f"Received file descriptor: {fd}")
11 case SCMCredentials(credentials):
12 for ucred in credentials:
13 print(f"Received unix credential: {ucred}")
1from easynetwork.lowlevel.socket import SCMCredentials, SCMRights, SocketAncillary
2
3ancillary = SocketAncillary()
4packet = await client.recv_packet(ancillary_data=ancillary)
5print(f"Received packet: {packet!r}")
6for message in ancillary.messages():
7 match message:
8 case SCMRights(fds):
9 for fd in fds:
10 print(f"Received file descriptor: {fd}")
11 case SCMCredentials(credentials):
12 for ucred in credentials:
13 print(f"Received unix credential: {ucred}")
Tip
The default buffer size for this operation is approximately 8 KiB. However, you can customize this behavior.
1from socket import CMSG_LEN
2
3from easynetwork.lowlevel.socket import SocketAncillary
4
5max_fds = 128
6ancillary = SocketAncillary()
7packet = client.recv_packet(ancillary_data=ancillary, ancillary_bufsize=CMSG_LEN(max_fds * 4))
8print(f"Received packet: {packet!r}")
9for fd in ancillary.iter_fds():
10 print(f"Received file descriptor: {fd}")
1from socket import CMSG_LEN
2
3from easynetwork.lowlevel.socket import SocketAncillary
4
5max_fds = 128
6ancillary = SocketAncillary()
7packet = await client.recv_packet(ancillary_data=ancillary, ancillary_bufsize=CMSG_LEN(max_fds * 4))
8print(f"Received packet: {packet!r}")
9for fd in ancillary.iter_fds():
10 print(f"Received file descriptor: {fd}")
Receiving Multiple Packets At Once
You can use iter_received_packets() to get all the received packets in a sequence or a set.
1all_packets = [p for p in client.iter_received_packets()]
1all_packets = [p async for p in client.iter_received_packets()]
The timeout parameter defaults to zero to get only the data already in the buffer, but you can change it.
1all_packets = [p for p in client.iter_received_packets(timeout=1)]
See also
UnixDatagramClient.iter_received_packets()The method description and usage (especially for the
timeoutparameter).
1all_packets = [p async for p in client.iter_received_packets(timeout=1)]
See also
AsyncUnixDatagramClient.iter_received_packets()The method description and usage (especially for the
timeoutparameter).
Low-Level Socket Operations
For low-level operations such as setsockopt(), the client object exposes the socket through a SocketProxy:
1client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_DEBUG, True)
1client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_DEBUG, True)
Warning
Make sure that wait_connected() has been called before.
Concurrency And Multithreading
All client methods are thread-safe. Synchronization follows these rules:
send_packet()andrecv_packet()do not share the samethreading.Lockinstance.close()will not wait forrecv_packet().The
client.socketmethods are also thread-safe. This means that you cannot access the underlying socket methods (e.g.getsockopt()) during a write operation.
All client methods do not require external task synchronization (such as asyncio.Lock).
Synchronization follows these rules:
send_packet()andrecv_packet()do not share the same lock instance.aclose()will not wait forrecv_packet().