Alternative — Unix Stream Client Endpoints

Note

This page uses two different API variants:

  • Synchronous API with classic def functions, usable in any context.

  • Asynchronous API with async def functions, using an asynchronous framework to perform I/O operations.

All asynchronous API examples assume that you are using either asyncio or trio, but you can use a different library thanks to the asynchronous backend engine API.


The Protocol Object

The UNIX stream clients expect a StreamProtocol instance to communicate with the peer endpoint.

See also

How-to — Communication Protocols

Explains what a StreamProtocol is and how to use it.

Connecting To Peer

You need the listening socket’s filepath:

 1from __future__ import annotations
 2
 3from easynetwork.clients.unix_stream import UnixStreamClient
 4from easynetwork.protocol import StreamProtocol
 5from easynetwork.serializers import JSONSerializer
 6
 7
 8def main() -> None:
 9    protocol = StreamProtocol(JSONSerializer())
10    address = "/var/run/app/app.sock"
11
12    with UnixStreamClient(address, protocol) as client:
13        print(f"Remote address: {client.get_peer_name()}")
14
15        ...
16
17
18if __name__ == "__main__":
19    main()

You can control the connection timeout with the connect_timeout parameter:

 8def main() -> None:
 9    protocol = StreamProtocol(JSONSerializer())
10    address = "/var/run/app/app.sock"
11
12    try:
13        client = UnixStreamClient(address, protocol, connect_timeout=30)
14    except TimeoutError:
15        print(f"Could not connect to {address} after 30 seconds")
16        return
17
18    with client:
19        print(f"Remote address: {client.get_peer_name()}")
20
21        ...

Note

The client does nothing when it enters the with context. Everything is done on object creation.

Using An Already Connected Socket

If you have your own way to obtain a connected socket.socket instance, you can pass it to the client.

If the socket is not connected, an OSError is raised.

Important

It must be a SOCK_STREAM socket with AF_UNIX family.

Warning

The resource ownership is given to the client. You must close the client to close the socket.

 1from __future__ import annotations
 2
 3import socket
 4
 5from easynetwork.clients.unix_stream import UnixStreamClient
 6from easynetwork.protocol import StreamProtocol
 7from easynetwork.serializers import JSONSerializer
 8
 9
10def obtain_a_connected_socket() -> socket.socket:
11    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
12
13    ...
14
15    return sock
16
17
18def main() -> None:
19    protocol = StreamProtocol(JSONSerializer())
20    sock = obtain_a_connected_socket()
21
22    with UnixStreamClient(sock, protocol) as client:
23        print(f"Remote address: {client.get_peer_name()}")
24
25        ...
26
27
28if __name__ == "__main__":
29    main()

Sending Packets

Objects passed as arguments are automatically converted to bytes to send to the remote host thanks to the protocol object.

1client.send_packet({"data": 42})

Sending Packets with socket control messages

By using SocketAncillary, you can send SCM data. See the Unix manual page sendmsg(2) for details.

1from easynetwork.lowlevel.socket import SocketAncillary
2
3ancillary = SocketAncillary()
4ancillary.add_fds([4])
5client.send_packet({"data": 42}, ancillary_data=ancillary)

Receiving Packets

You get the next available packet, already parsed. Extraneous data is kept for the next call.

1packet = client.recv_packet()
2print(f"Received packet: {packet!r}")

You can control the receive timeout with the timeout parameter:

1try:
2    packet = client.recv_packet(timeout=30)
3except TimeoutError:
4    print("Timed out")
5else:
6    print(f"Received packet: {packet!r}")

Tip

Remember to catch invalid data parsing errors.

1try:
2    packet = client.recv_packet(timeout=30)
3except StreamProtocolParseError:
4    print("Received something, but was not valid")
5except TimeoutError:
6    print("Timed out")
7else:
8    print(f"Received packet: {packet!r}")

Receiving Packets with socket control messages

By using SocketAncillary, you can receive SCM data. See the Unix manual page recvmsg(2) for details.

 1from easynetwork.lowlevel.socket import SCMCredentials, SCMRights, SocketAncillary
 2
 3ancillary = SocketAncillary()
 4packet = client.recv_packet(ancillary_data=ancillary)
 5print(f"Received packet: {packet!r}")
 6for message in ancillary.messages():
 7    match message:
 8        case SCMRights(fds):
 9            for fd in fds:
10                print(f"Received file descriptor: {fd}")
11        case SCMCredentials(credentials):
12            for ucred in credentials:
13                print(f"Received unix credential: {ucred}")

Tip

The default buffer size for this operation is approximately 8 KiB. However, you can customize this behavior.

 1from socket import CMSG_LEN
 2
 3from easynetwork.lowlevel.socket import SocketAncillary
 4
 5max_fds = 128
 6ancillary = SocketAncillary()
 7packet = client.recv_packet(ancillary_data=ancillary, ancillary_bufsize=CMSG_LEN(max_fds * 4))
 8print(f"Received packet: {packet!r}")
 9for fd in ancillary.iter_fds():
10    print(f"Received file descriptor: {fd}")

Receiving Multiple Packets At Once

You can use iter_received_packets() to get all the received packets in a sequence or a set.

1all_packets = [p for p in client.iter_received_packets()]

The timeout parameter defaults to zero to get only the data already in the buffer, but you can change it.

1all_packets = [p for p in client.iter_received_packets(timeout=1)]

See also

UnixStreamClient.iter_received_packets()

The method description and usage (especially for the timeout parameter).

Close The Write-End Stream

If you are sure you will never reuse send_packet(), you can call send_eof() to shut down the write stream.

1client.send_eof()

Note

send_eof() will block until all unsent data has been flushed before closing the stream.

Peer Credentials

Get the peer’s process information with get_peer_credentials():

1peer_creds = client.get_peer_credentials()
2print(f"pid={peer_creds.pid}, user_id={peer_creds.uid}, group_id={peer_creds.gid}")

Low-Level Socket Operations

For low-level operations such as setsockopt(), the client object exposes the socket through a SocketProxy:

1client.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, False)

socket.recv() Buffer Size

By default, the client uses a reasonable buffer size when calling recv_packet(). You can control this value by setting the max_recv_size parameter:

1with UnixStreamClient(address, protocol, max_recv_size=1024) as client:
2    # Only do socket.recv(1024) calls
3    packet = client.recv_packet()

Note

max_recv_size is also used as a size hint for buffered serializers. See this section for details.

Concurrency And Multithreading

All client methods are thread-safe. Synchronization follows these rules: