Alternative — Unix Stream Client Endpoints
Note
This page uses two different API variants:
Synchronous API with classic
deffunctions, usable in any context.Asynchronous API with
async deffunctions, using an asynchronous framework to perform I/O operations.
All asynchronous API examples assume that you are using either asyncio or trio,
but you can use a different library thanks to the asynchronous backend engine API.
The Protocol Object
The UNIX stream clients expect a StreamProtocol instance to communicate with the peer endpoint.
See also
- How-to — Communication Protocols
Explains what a
StreamProtocolis and how to use it.
Connecting To Peer
You need the listening socket’s filepath:
1from __future__ import annotations
2
3from easynetwork.clients.unix_stream import UnixStreamClient
4from easynetwork.protocol import StreamProtocol
5from easynetwork.serializers import JSONSerializer
6
7
8def main() -> None:
9 protocol = StreamProtocol(JSONSerializer())
10 address = "/var/run/app/app.sock"
11
12 with UnixStreamClient(address, protocol) as client:
13 print(f"Remote address: {client.get_peer_name()}")
14
15 ...
16
17
18if __name__ == "__main__":
19 main()
You can control the connection timeout with the connect_timeout parameter:
8def main() -> None:
9 protocol = StreamProtocol(JSONSerializer())
10 address = "/var/run/app/app.sock"
11
12 try:
13 client = UnixStreamClient(address, protocol, connect_timeout=30)
14 except TimeoutError:
15 print(f"Could not connect to {address} after 30 seconds")
16 return
17
18 with client:
19 print(f"Remote address: {client.get_peer_name()}")
20
21 ...
Note
The client does nothing when it enters the with context. Everything is done on object creation.
1from __future__ import annotations
2
3import asyncio
4
5from easynetwork.clients.async_unix_stream import AsyncUnixStreamClient
6from easynetwork.protocol import StreamProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 async with AsyncUnixStreamClient(address, protocol) as client:
15 print(f"Remote address: {client.get_peer_name()}")
16
17 ...
18
19
20if __name__ == "__main__":
21 asyncio.run(main())
You can control the connection timeout by adding a timeout scope using the asynchronous framework:
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 try:
15 client = AsyncUnixStreamClient(address, protocol)
16 async with asyncio.timeout(30):
17 await client.wait_connected()
18 except TimeoutError:
19 print(f"Could not connect to {address} after 30 seconds")
20 return
21
22 async with client:
23 print(f"Remote address: {client.get_peer_name()}")
24
25 ...
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 try:
15 client = AsyncUnixStreamClient(address, protocol)
16 with trio.fail_after(30):
17 await client.wait_connected()
18 except trio.TooSlowError:
19 print(f"Could not connect to {address} after 30 seconds")
20 return
21
22 async with client:
23 print(f"Remote address: {client.get_peer_name()}")
24
25 ...
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = "/var/run/app/app.sock"
13
14 try:
15 client = AsyncUnixStreamClient(address, protocol)
16 with client.backend().timeout(30):
17 await client.wait_connected()
18 except TimeoutError:
19 print(f"Could not connect to {address} after 30 seconds")
20 return
21
22 async with client:
23 print(f"Remote address: {client.get_peer_name()}")
24
25 ...
Note
The call to wait_connected() is required to actually initialize the client,
since we cannot perform asynchronous operations at object creation.
This is what the client does when it enters the the async with context.
Once completed, wait_connected() is a no-op.
Using An Already Connected Socket
If you have your own way to obtain a connected socket.socket instance, you can pass it to the client.
If the socket is not connected, an OSError is raised.
Important
It must be a SOCK_STREAM socket with AF_UNIX family.
Warning
The resource ownership is given to the client. You must close the client to close the socket.
1from __future__ import annotations
2
3import socket
4
5from easynetwork.clients.unix_stream import UnixStreamClient
6from easynetwork.protocol import StreamProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10def obtain_a_connected_socket() -> socket.socket:
11 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
12
13 ...
14
15 return sock
16
17
18def main() -> None:
19 protocol = StreamProtocol(JSONSerializer())
20 sock = obtain_a_connected_socket()
21
22 with UnixStreamClient(sock, protocol) as client:
23 print(f"Remote address: {client.get_peer_name()}")
24
25 ...
26
27
28if __name__ == "__main__":
29 main()
1from __future__ import annotations
2
3import asyncio
4import socket
5
6from easynetwork.clients.async_unix_stream import AsyncUnixStreamClient
7from easynetwork.protocol import StreamProtocol
8from easynetwork.serializers import JSONSerializer
9
10
11async def obtain_a_connected_socket() -> socket.socket:
12 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
13
14 ...
15
16 return sock
17
18
19async def main() -> None:
20 protocol = StreamProtocol(JSONSerializer())
21 sock = await obtain_a_connected_socket()
22
23 async with AsyncUnixStreamClient(sock, protocol) as client:
24 print(f"Remote address: {client.get_peer_name()}")
25
26 ...
27
28
29if __name__ == "__main__":
30 asyncio.run(main())
Note
Even with a ready-to-use socket, the call to wait_connected() is still required.
Sending Packets
There’s not much to say, except that objects passed as arguments are automatically converted to bytes to send to the remote host thanks to the protocol object.
Warning
Sending socket control messages ( with sendmsg(2) ) is not supported yet.
1client.send_packet({"data": 42})
1await client.send_packet({"data": 42})
Receiving Packets
You get the next available packet, already parsed. Extraneous data is kept for the next call.
Warning
Receiving socket control messages ( with recvmsg(2) ) is not supported yet.
1packet = client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout with the timeout parameter:
1try:
2 packet = client.recv_packet(timeout=30)
3except TimeoutError:
4 print("Timed out")
5else:
6 print(f"Received packet: {packet!r}")
1packet = await client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout by adding a timeout scope using the asynchronous framework:
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except TimeoutError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
1try:
2 with trio.fail_after(30):
3 packet = await client.recv_packet()
4except trio.TooSlowError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
1try:
2 with client.backend().timeout(30):
3 packet = await client.recv_packet()
4except TimeoutError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
Tip
Remember to catch invalid data parsing errors.
1try:
2 packet = client.recv_packet(timeout=30)
3except StreamProtocolParseError:
4 print("Received something, but was not valid")
5except TimeoutError:
6 print("Timed out")
7else:
8 print(f"Received packet: {packet!r}")
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except StreamProtocolParseError:
5 print("Received something, but was not valid")
6except TimeoutError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
1try:
2 with trio.fail_after(30):
3 packet = await client.recv_packet()
4except StreamProtocolParseError:
5 print("Received something, but was not valid")
6except trio.TooSlowError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
1try:
2 with client.backend().timeout(30):
3 packet = await client.recv_packet()
4except StreamProtocolParseError:
5 print("Received something, but was not valid")
6except TimeoutError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
Receiving Multiple Packets At Once
You can use iter_received_packets() to get all the received packets in a sequence or a set.
1all_packets = [p for p in client.iter_received_packets()]
1all_packets = [p async for p in client.iter_received_packets()]
The timeout parameter defaults to zero to get only the data already in the buffer, but you can change it.
1all_packets = [p for p in client.iter_received_packets(timeout=1)]
See also
UnixStreamClient.iter_received_packets()The method description and usage (especially for the
timeoutparameter).
1all_packets = [p async for p in client.iter_received_packets(timeout=1)]
See also
AsyncUnixStreamClient.iter_received_packets()The method description and usage (especially for the
timeoutparameter).
Close The Write-End Stream
If you are sure you will never reuse send_packet(), you can call send_eof() to shut down the write stream.
1client.send_eof()
1await client.send_eof()
Note
send_eof() will block until all unsent data has been flushed before closing the stream.
Peer Credentials
Get the peer’s process information with get_peer_credentials():
1peer_creds = client.get_peer_credentials()
2print(f"pid={peer_creds.pid}, user_id={peer_creds.uid}, group_id={peer_creds.gid}")
1peer_creds = client.get_peer_credentials()
2print(f"pid={peer_creds.pid}, user_id={peer_creds.uid}, group_id={peer_creds.gid}")
Warning
Make sure that wait_connected() has been called before.
Warning
The socket option SO_PASSCRED on Linux has no effect for now. This will be handled in the future.
Low-Level Socket Operations
For low-level operations such as setsockopt(), the client object exposes the socket through a SocketProxy:
1client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, False)
1client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, False)
Warning
Make sure that wait_connected() has been called before.
socket.recv() Buffer Size
By default, the client uses a reasonable buffer size when calling recv_packet().
You can control this value by setting the max_recv_size parameter:
1with UnixStreamClient(address, protocol, max_recv_size=1024) as client:
2 # Only do socket.recv(1024) calls
3 packet = client.recv_packet()
1async with AsyncUnixStreamClient(address, protocol, max_recv_size=1024) as client:
2 # Only do socket.recv(1024) calls
3 packet = await client.recv_packet()
Note
max_recv_size is also used as a size hint for buffered serializers.
See this section for details.
Concurrency And Multithreading
All client methods are thread-safe. Synchronization follows these rules:
send_packet()andrecv_packet()do not share the samethreading.Lockinstance.close()will not wait forrecv_packet().The
client.socketmethods are also thread-safe. This means that you cannot access the underlying socket methods (e.g.getsockopt()) during a write operation.
All client methods do not require external task synchronization (such as asyncio.Lock).
Synchronization follows these rules:
send_packet()andrecv_packet()do not share the same lock instance.aclose()will not wait forrecv_packet().