Alternative — Unix Stream Client Endpoints
Note
This page uses two different API variants:
Synchronous API with classic
deffunctions, usable in any context.Asynchronous API with
async deffunctions, using an asynchronous framework to perform I/O operations.
All asynchronous API examples assume that you are using either asyncio or trio,
but you can use a different library thanks to the asynchronous backend engine API.
The Protocol Object
The UNIX stream clients expect a StreamProtocol instance to communicate with the peer endpoint.
See also
- How-to — Communication Protocols
Explains what a
StreamProtocolis and how to use it.
Connecting To Peer
You need the listening socket’s filepath:
1from __future__ import annotations
2
3from easynetwork.clients.unix_stream import UnixStreamClient
4from easynetwork.protocol import StreamProtocol
5from easynetwork.serializers import JSONSerializer
6
7
8def main() -> None:
9 protocol = StreamProtocol(JSONSerializer())
10 address = "/var/run/app/app.sock"
11
12 with UnixStreamClient(address, protocol) as client:
13 print(f"Remote address: {client.get_peer_name()}")
14
15 ...
16
17
18if __name__ == "__main__":
19 main()
You can control the connection timeout with the connect_timeout parameter:
8def main() -> None:
9 protocol = StreamProtocol(JSONSerializer())
10 address = "/var/run/app/app.sock"
11
12 try:
13 client = UnixStreamClient(address, protocol, connect_timeout=30)
14 except TimeoutError:
15 print(f"Could not connect to {address} after 30 seconds")
16 return
17
18 with client:
19 print(f"Remote address: {client.get_peer_name()}")
20
21 ...
Note
The client does nothing when it enters the with context. Everything is done on object creation.
1from __future__ import annotations
2
3import asyncio
4
5from easynetwork.clients.async_unix_stream import AsyncUnixStreamClient
6from easynetwork.protocol import StreamProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 async with AsyncUnixStreamClient(address, protocol) as client:
15 print(f"Remote address: {client.get_peer_name()}")
16
17 ...
18
19
20if __name__ == "__main__":
21 asyncio.run(main())
You can control the connection timeout by adding a timeout scope using the asynchronous framework:
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 try:
15 client = AsyncUnixStreamClient(address, protocol)
16 async with asyncio.timeout(30):
17 await client.wait_connected()
18 except TimeoutError:
19 print(f"Could not connect to {address} after 30 seconds")
20 return
21
22 async with client:
23 print(f"Remote address: {client.get_peer_name()}")
24
25 ...
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 try:
15 client = AsyncUnixStreamClient(address, protocol)
16 with trio.fail_after(30):
17 await client.wait_connected()
18 except trio.TooSlowError:
19 print(f"Could not connect to {address} after 30 seconds")
20 return
21
22 async with client:
23 print(f"Remote address: {client.get_peer_name()}")
24
25 ...
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 try:
15 client = AsyncUnixStreamClient(address, protocol)
16 with client.backend().timeout(30):
17 await client.wait_connected()
18 except TimeoutError:
19 print(f"Could not connect to {address} after 30 seconds")
20 return
21
22 async with client:
23 print(f"Remote address: {client.get_peer_name()}")
24
25 ...
Note
The call to wait_connected() is required to actually initialize the client,
since we cannot perform asynchronous operations at object creation.
This is what the client does when it enters the the async with context.
Once completed, wait_connected() is a no-op.
Using An Already Connected Socket
If you have your own way to obtain a connected socket.socket instance, you can pass it to the client.
If the socket is not connected, an OSError is raised.
Important
It must be a SOCK_STREAM socket with AF_UNIX family.
Warning
The resource ownership is given to the client. You must close the client to close the socket.
1from __future__ import annotations
2
3import socket
4
5from easynetwork.clients.unix_stream import UnixStreamClient
6from easynetwork.protocol import StreamProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10def obtain_a_connected_socket() -> socket.socket:
11 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
12
13 ...
14
15 return sock
16
17
18def main() -> None:
19 protocol = StreamProtocol(JSONSerializer())
20 sock = obtain_a_connected_socket()
21
22 with UnixStreamClient(sock, protocol) as client:
23 print(f"Remote address: {client.get_peer_name()}")
24
25 ...
26
27
28if __name__ == "__main__":
29 main()
1from __future__ import annotations
2
3import asyncio
4import socket
5
6from easynetwork.clients.async_unix_stream import AsyncUnixStreamClient
7from easynetwork.protocol import StreamProtocol
8from easynetwork.serializers import JSONSerializer
9
10
11async def obtain_a_connected_socket() -> socket.socket:
12 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
13
14 ...
15
16 return sock
17
18
19async def main() -> None:
20 protocol = StreamProtocol(JSONSerializer())
21 sock = await obtain_a_connected_socket()
22
23 async with AsyncUnixStreamClient(sock, protocol) as client:
24 print(f"Remote address: {client.get_peer_name()}")
25
26 ...
27
28
29if __name__ == "__main__":
30 asyncio.run(main())
Note
Even with a ready-to-use socket, the call to wait_connected() is still required.
Sending Packets
Objects passed as arguments are automatically converted to bytes to send to the remote host thanks to the protocol object.
1client.send_packet({"data": 42})
1await client.send_packet({"data": 42})
Sending Packets with socket control messages
By using SocketAncillary, you can send SCM data. See the Unix manual page sendmsg(2) for details.
1from easynetwork.lowlevel.socket import SocketAncillary
2
3ancillary = SocketAncillary()
4ancillary.add_fds([4])
5client.send_packet({"data": 42}, ancillary_data=ancillary)
1from easynetwork.lowlevel.socket import SocketAncillary
2
3ancillary = SocketAncillary()
4ancillary.add_fds([4])
5await client.send_packet({"data": 42}, ancillary_data=ancillary)
Receiving Packets
You get the next available packet, already parsed. Extraneous data is kept for the next call.
1packet = client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout with the timeout parameter:
1try:
2 packet = client.recv_packet(timeout=30)
3except TimeoutError:
4 print("Timed out")
5else:
6 print(f"Received packet: {packet!r}")
1packet = await client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout by adding a timeout scope using the asynchronous framework:
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except TimeoutError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
1try:
2 with trio.fail_after(30):
3 packet = await client.recv_packet()
4except trio.TooSlowError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
1try:
2 with client.backend().timeout(30):
3 packet = await client.recv_packet()
4except TimeoutError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
Tip
Remember to catch invalid data parsing errors.
1try:
2 packet = client.recv_packet(timeout=30)
3except StreamProtocolParseError:
4 print("Received something, but was not valid")
5except TimeoutError:
6 print("Timed out")
7else:
8 print(f"Received packet: {packet!r}")
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except StreamProtocolParseError:
5 print("Received something, but was not valid")
6except TimeoutError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
1try:
2 with trio.fail_after(30):
3 packet = await client.recv_packet()
4except StreamProtocolParseError:
5 print("Received something, but was not valid")
6except trio.TooSlowError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
1try:
2 with client.backend().timeout(30):
3 packet = await client.recv_packet()
4except StreamProtocolParseError:
5 print("Received something, but was not valid")
6except TimeoutError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
Receiving Packets with socket control messages
By using SocketAncillary, you can receive SCM data. See the Unix manual page recvmsg(2) for details.
1from easynetwork.lowlevel.socket import SCMCredentials, SCMRights, SocketAncillary
2
3ancillary = SocketAncillary()
4packet = client.recv_packet(ancillary_data=ancillary)
5print(f"Received packet: {packet!r}")
6for message in ancillary.messages():
7 match message:
8 case SCMRights(fds):
9 for fd in fds:
10 print(f"Received file descriptor: {fd}")
11 case SCMCredentials(credentials):
12 for ucred in credentials:
13 print(f"Received unix credential: {ucred}")
1from easynetwork.lowlevel.socket import SCMCredentials, SCMRights, SocketAncillary
2
3ancillary = SocketAncillary()
4packet = await client.recv_packet(ancillary_data=ancillary)
5print(f"Received packet: {packet!r}")
6for message in ancillary.messages():
7 match message:
8 case SCMRights(fds):
9 for fd in fds:
10 print(f"Received file descriptor: {fd}")
11 case SCMCredentials(credentials):
12 for ucred in credentials:
13 print(f"Received unix credential: {ucred}")
Tip
The default buffer size for this operation is approximately 8 KiB. However, you can customize this behavior.
1from socket import CMSG_LEN
2
3from easynetwork.lowlevel.socket import SocketAncillary
4
5max_fds = 128
6ancillary = SocketAncillary()
7packet = client.recv_packet(ancillary_data=ancillary, ancillary_bufsize=CMSG_LEN(max_fds * 4))
8print(f"Received packet: {packet!r}")
9for fd in ancillary.iter_fds():
10 print(f"Received file descriptor: {fd}")
1from socket import CMSG_LEN
2
3from easynetwork.lowlevel.socket import SocketAncillary
4
5max_fds = 128
6ancillary = SocketAncillary()
7packet = await client.recv_packet(ancillary_data=ancillary, ancillary_bufsize=CMSG_LEN(max_fds * 4))
8print(f"Received packet: {packet!r}")
9for fd in ancillary.iter_fds():
10 print(f"Received file descriptor: {fd}")
Receiving Multiple Packets At Once
You can use iter_received_packets() to get all the received packets in a sequence or a set.
1all_packets = [p for p in client.iter_received_packets()]
1all_packets = [p async for p in client.iter_received_packets()]
The timeout parameter defaults to zero to get only the data already in the buffer, but you can change it.
1all_packets = [p for p in client.iter_received_packets(timeout=1)]
See also
UnixStreamClient.iter_received_packets()The method description and usage (especially for the
timeoutparameter).
1all_packets = [p async for p in client.iter_received_packets(timeout=1)]
See also
AsyncUnixStreamClient.iter_received_packets()The method description and usage (especially for the
timeoutparameter).
Close The Write-End Stream
If you are sure you will never reuse send_packet(), you can call send_eof() to shut down the write stream.
1client.send_eof()
1await client.send_eof()
Note
send_eof() will block until all unsent data has been flushed before closing the stream.
Peer Credentials
Get the peer’s process information with get_peer_credentials():
1peer_creds = client.get_peer_credentials()
2print(f"pid={peer_creds.pid}, user_id={peer_creds.uid}, group_id={peer_creds.gid}")
1peer_creds = client.get_peer_credentials()
2print(f"pid={peer_creds.pid}, user_id={peer_creds.uid}, group_id={peer_creds.gid}")
Warning
Make sure that wait_connected() has been called before.
Low-Level Socket Operations
For low-level operations such as setsockopt(), the client object exposes the socket through a SocketProxy:
1client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, False)
1client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, False)
Warning
Make sure that wait_connected() has been called before.
socket.recv() Buffer Size
By default, the client uses a reasonable buffer size when calling recv_packet().
You can control this value by setting the max_recv_size parameter:
1with UnixStreamClient(address, protocol, max_recv_size=1024) as client:
2 # Only do socket.recv(1024) calls
3 packet = client.recv_packet()
1async with AsyncUnixStreamClient(address, protocol, max_recv_size=1024) as client:
2 # Only do socket.recv(1024) calls
3 packet = await client.recv_packet()
Note
max_recv_size is also used as a size hint for buffered serializers.
See this section for details.
Concurrency And Multithreading
All client methods are thread-safe. Synchronization follows these rules:
send_packet()andrecv_packet()do not share the samethreading.Lockinstance.close()will not wait forrecv_packet().The
client.socketmethods are also thread-safe. This means that you cannot access the underlying socket methods (e.g.getsockopt()) during a write operation.
All client methods do not require external task synchronization (such as asyncio.Lock).
Synchronization follows these rules:
send_packet()andrecv_packet()do not share the same lock instance.aclose()will not wait forrecv_packet().