How-to — TCP Client Endpoints

Note

This page uses two different API variants:

  • Synchronous API with classic def functions, usable in any context.

  • Asynchronous API with async def functions, using an asynchronous framework to perform I/O operations.

All asynchronous API examples assume that you are using asyncio, but you can use a different library thanks to the asynchronous backend engine API.


The Basics

The Protocol Object

The TCP clients expect a StreamProtocol instance to communicate with the remote endpoint.

See also

How-to — Communication Protocols

Explains what a StreamProtocol is and how to use it.

Connecting To The Remote Host

You need the host address (domain name or IP) and the port of connection in order to connect to the remote host:

 1from __future__ import annotations
 2
 3from easynetwork.api_sync.client import TCPNetworkClient
 4from easynetwork.protocol import StreamProtocol
 5from easynetwork.serializers import JSONSerializer
 6
 7
 8def main() -> None:
 9    protocol = StreamProtocol(JSONSerializer())
10    address = ("localhost", 9000)
11
12    with TCPNetworkClient(address, protocol) as client:
13        print(f"Remote address: {client.get_remote_address()}")
14
15        ...
16
17
18if __name__ == "__main__":
19    main()

You can control the connection timeout with the connect_timeout parameter:

 8def main() -> None:
 9    protocol = StreamProtocol(JSONSerializer())
10    address = ("localhost", 9000)
11
12    try:
13        client = TCPNetworkClient(address, protocol, connect_timeout=30)
14    except TimeoutError:
15        print(f"Could not connect to {address} after 30 seconds")
16        return
17
18    with client:
19        print(f"Remote address: {client.get_remote_address()}")
20
21        ...

Note

The client does nothing when it enters the with context. Everything is done on object creation.

Using An Already Connected Socket

If you have your own way to obtain a connected socket.socket instance, you can pass it to the client.

If the socket is not connected, an OSError is raised.

Important

It must be a SOCK_STREAM socket with AF_INET or AF_INET6 family.

Warning

The resource ownership is given to the client. You must close the client to close the socket.

 1from __future__ import annotations
 2
 3import socket
 4
 5from easynetwork.api_sync.client import TCPNetworkClient
 6from easynetwork.protocol import StreamProtocol
 7from easynetwork.serializers import JSONSerializer
 8
 9
10def obtain_a_connected_socket() -> socket.socket:
11    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
12
13    ...
14
15    return sock
16
17
18def main() -> None:
19    protocol = StreamProtocol(JSONSerializer())
20    sock = obtain_a_connected_socket()
21
22    with TCPNetworkClient(sock, protocol) as client:
23        print(f"Remote address: {client.get_remote_address()}")
24
25        ...
26
27
28if __name__ == "__main__":
29    main()

Basic Usage

Sending Packets

There’s not much to say, except that objects passed as arguments are automatically converted to bytes to send to the remote host thanks to the protocol object.

1client.send_packet({"data": 42})

Receiving Packets

You get the next available packet, already parsed. Extraneous data is kept for the next call.

1packet = client.recv_packet()
2print(f"Received packet: {packet!r}")

You can control the receive timeout with the timeout parameter:

1try:
2    packet = client.recv_packet(timeout=30)
3except TimeoutError:
4    print("Timed out")
5else:
6    print(f"Received packet: {packet!r}")

Tip

Remember to catch invalid data parsing errors.

1try:
2    packet = client.recv_packet(timeout=30)
3except StreamProtocolParseError:
4    print("Received something, but was not valid")
5except TimeoutError:
6    print("Timed out")
7else:
8    print(f"Received packet: {packet!r}")

Receiving Multiple Packets At Once

You can use iter_received_packets() to get all the received packets in a sequence or a set.

1all_packets = [p for p in client.iter_received_packets()]

The timeout parameter defaults to zero to get only the data already in the buffer, but you can change it.

1all_packets = [p for p in client.iter_received_packets(timeout=1)]

See also

TCPNetworkClient.iter_received_packets()

The method description and usage (especially for the timeout parameter).

Advanced Usage

Note

This section is for people who know what they’re doing and are looking for something specific.

Close The Write-End Stream

If you are sure you will never reuse send_packet(), you can call send_eof() to shut down the write stream.

1client.send_eof()

Note

send_eof() will block until all unsent data has been flushed before closing the stream.

Low-Level Socket Operations

For low-level operations such as setsockopt(), the client object exposes the socket through a SocketProxy:

1client.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, False)

socket.recv() Buffer Size

By default, the client uses a reasonable buffer size when calling recv_packet(). You can control this value by setting the max_recv_size parameter:

1with TCPNetworkClient(address, protocol, max_recv_size=1024) as client:
2    # Only do socket.recv(1024) calls
3    packet = client.recv_packet()

Note

max_recv_size is also used as a size hint for buffered serializers. See this section for details.

SSL/TLS Connection

If you want to use SSL to communicate with the remote host, the easiest way is to pass ssl=True:

1with TCPNetworkClient(address, protocol, ssl=True) as client:
2    client.send_packet({"data": 42})
3
4    packet = client.recv_packet()

Note

Since this is a client object, ssl.SSLContext.wrap_socket() and ssl.SSLContext.wrap_bio() are always called with server_side=False.

Danger

You can pass an SSLContext instead, but at this point I expect you to really know what you are doing.

Concurrency And Multithreading

All client methods are thread-safe. Synchronization follows these rules:

This allows you to do something like this:

 1from __future__ import annotations
 2
 3import contextlib
 4import threading
 5import traceback
 6from concurrent.futures import ThreadPoolExecutor
 7from typing import Any, TypeAlias
 8
 9from easynetwork.api_sync.client import TCPNetworkClient
10from easynetwork.exceptions import StreamProtocolParseError
11from easynetwork.protocol import StreamProtocol
12from easynetwork.serializers import JSONSerializer
13
14RequestType: TypeAlias = dict[str, Any]
15ResponseType: TypeAlias = dict[str, Any]
16
17
18def consumer(response: ResponseType) -> None:
19    # Do some stuff...
20
21    print(response)
22
23
24def receiver_worker(
25    client: TCPNetworkClient[RequestType, ResponseType],
26    executor: ThreadPoolExecutor,
27) -> None:
28    while True:
29        try:
30            # Pass timeout=None to get an infinite iterator.
31            # It will be terminated when the client.close() method has been called.
32            for response in client.iter_received_packets(timeout=None):
33                executor.submit(consumer, response)
34        except StreamProtocolParseError:
35            print("Parsing error")
36            traceback.print_exc()
37            continue
38
39
40def do_main_stuff(client: TCPNetworkClient[RequestType, ResponseType]) -> None:
41    while True:
42        # Do some stuff...
43        request = {"data": 42}
44
45        client.send_packet(request)
46
47
48def main() -> None:
49    remote_address = ("localhost", 9000)
50    protocol = StreamProtocol(JSONSerializer())
51
52    with contextlib.ExitStack() as exit_stack:
53        # thread pool executor setup
54        executor = exit_stack.enter_context(ThreadPoolExecutor())
55
56        # connect to remote
57        client = TCPNetworkClient(remote_address, protocol)
58
59        # receiver_worker thread setup
60        receiver_worker_thread = threading.Thread(
61            target=receiver_worker,
62            args=(client, executor),
63        )
64        receiver_worker_thread.start()
65        exit_stack.callback(receiver_worker_thread.join)
66
67        # add the client close, so it will be closed
68        # before joining the thread
69        exit_stack.enter_context(client)
70
71        # Setup done, let's go
72        do_main_stuff(client)
73
74
75if __name__ == "__main__":
76    main()

SSL/TLS Considerations

For safety, concurrent calls to send_packet() and recv_packet() are “disabled” by default when using SSL. In fact, they share the same synchronization lock.

If you need this feature after all, you can pass ssl_shared_lock=False at object creation.

Danger

But you don’t need it, do you?

1client = TCPNetworkClient(
2    remote_address,
3    protocol,
4    ssl=True,
5    ssl_shared_lock=False,
6)