How-to — TCP Client Endpoints
Note
This page uses two different API variants:
Synchronous API with classic
def
functions, usable in any context.Asynchronous API with
async def
functions, using an asynchronous framework to perform I/O operations.
All asynchronous API examples assume that you are using asyncio
,
but you can use a different library thanks to the asynchronous backend engine API.
The Basics
The Protocol Object
The TCP clients expect a StreamProtocol
instance to communicate with the remote endpoint.
See also
- How-to — Communication Protocols
Explains what a
StreamProtocol
is and how to use it.
Connecting To The Remote Host
You need the host address (domain name or IP) and the port of connection in order to connect to the remote host:
1from __future__ import annotations
2
3from easynetwork.api_sync.client import TCPNetworkClient
4from easynetwork.protocol import StreamProtocol
5from easynetwork.serializers import JSONSerializer
6
7
8def main() -> None:
9 protocol = StreamProtocol(JSONSerializer())
10 address = ("localhost", 9000)
11
12 with TCPNetworkClient(address, protocol) as client:
13 print(f"Remote address: {client.get_remote_address()}")
14
15 ...
16
17
18if __name__ == "__main__":
19 main()
You can control the connection timeout with the connect_timeout
parameter:
8def main() -> None:
9 protocol = StreamProtocol(JSONSerializer())
10 address = ("localhost", 9000)
11
12 try:
13 client = TCPNetworkClient(address, protocol, connect_timeout=30)
14 except TimeoutError:
15 print(f"Could not connect to {address} after 30 seconds")
16 return
17
18 with client:
19 print(f"Remote address: {client.get_remote_address()}")
20
21 ...
Note
The client does nothing when it enters the with
context. Everything is done on object creation.
1from __future__ import annotations
2
3import asyncio
4
5from easynetwork.api_async.client import AsyncTCPNetworkClient
6from easynetwork.protocol import StreamProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = ("localhost", 9000)
13
14 async with AsyncTCPNetworkClient(address, protocol) as client:
15 print(f"Remote address: {client.get_remote_address()}")
16
17 ...
18
19
20if __name__ == "__main__":
21 asyncio.run(main())
You can control the connection timeout by adding a timeout scope using the asynchronous framework:
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = ("localhost", 9000)
13
14 try:
15 async with asyncio.timeout(30):
16 client = AsyncTCPNetworkClient(address, protocol)
17 await client.wait_connected()
18 except TimeoutError:
19 print(f"Could not connect to {address} after 30 seconds")
20 return
21
22 async with client:
23 print(f"Remote address: {client.get_remote_address()}")
24
25 ...
Note
The call to wait_connected()
is required to actually initialize the client, since we cannot perform asynchronous operations
at object creation. This is what the client does when it enters the the async with
context.
Once completed, wait_connected()
is a no-op.
Using An Already Connected Socket
If you have your own way to obtain a connected socket.socket
instance, you can pass it to the client.
If the socket is not connected, an OSError
is raised.
Important
It must be a SOCK_STREAM
socket with AF_INET
or AF_INET6
family.
Warning
The resource ownership is given to the client. You must close the client to close the socket.
1from __future__ import annotations
2
3import socket
4
5from easynetwork.api_sync.client import TCPNetworkClient
6from easynetwork.protocol import StreamProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10def obtain_a_connected_socket() -> socket.socket:
11 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
12
13 ...
14
15 return sock
16
17
18def main() -> None:
19 protocol = StreamProtocol(JSONSerializer())
20 sock = obtain_a_connected_socket()
21
22 with TCPNetworkClient(sock, protocol) as client:
23 print(f"Remote address: {client.get_remote_address()}")
24
25 ...
26
27
28if __name__ == "__main__":
29 main()
1from __future__ import annotations
2
3import asyncio
4import socket
5
6from easynetwork.api_async.client import AsyncTCPNetworkClient
7from easynetwork.protocol import StreamProtocol
8from easynetwork.serializers import JSONSerializer
9
10
11async def obtain_a_connected_socket() -> socket.socket:
12 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13
14 ...
15
16 return sock
17
18
19async def main() -> None:
20 protocol = StreamProtocol(JSONSerializer())
21 sock = await obtain_a_connected_socket()
22
23 async with AsyncTCPNetworkClient(sock, protocol) as client:
24 print(f"Remote address: {client.get_remote_address()}")
25
26 ...
27
28
29if __name__ == "__main__":
30 asyncio.run(main())
Note
Even with a ready-to-use socket, the call to wait_connected()
is still required.
Basic Usage
Sending Packets
There’s not much to say, except that objects passed as arguments are automatically converted to bytes to send to the remote host thanks to the protocol object.
1client.send_packet({"data": 42})
1await client.send_packet({"data": 42})
Receiving Packets
You get the next available packet, already parsed. Extraneous data is kept for the next call.
1packet = client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout with the timeout
parameter:
1try:
2 packet = client.recv_packet(timeout=30)
3except TimeoutError:
4 print("Timed out")
5else:
6 print(f"Received packet: {packet!r}")
1packet = await client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout by adding a timeout scope using the asynchronous framework:
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except TimeoutError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
Tip
Remember to catch invalid data parsing errors.
1try:
2 packet = client.recv_packet(timeout=30)
3except StreamProtocolParseError:
4 print("Received something, but was not valid")
5except TimeoutError:
6 print("Timed out")
7else:
8 print(f"Received packet: {packet!r}")
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except StreamProtocolParseError:
5 print("Received something, but was not valid")
6except TimeoutError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
Receiving Multiple Packets At Once
You can use iter_received_packets()
to get all the received packets in a sequence or a set.
1all_packets = [p for p in client.iter_received_packets()]
1all_packets = [p async for p in client.iter_received_packets()]
The timeout
parameter defaults to zero to get only the data already in the buffer, but you can change it.
1all_packets = [p for p in client.iter_received_packets(timeout=1)]
See also
TCPNetworkClient.iter_received_packets()
The method description and usage (especially for the
timeout
parameter).
1all_packets = [p async for p in client.iter_received_packets(timeout=1)]
See also
AsyncTCPNetworkClient.iter_received_packets()
The method description and usage (especially for the
timeout
parameter).
Advanced Usage
Note
This section is for people who know what they’re doing and are looking for something specific.
Close The Write-End Stream
If you are sure you will never reuse send_packet()
, you can call send_eof()
to shut down the write stream.
1client.send_eof()
1await client.send_eof()
Note
send_eof()
will block until all unsent data has been flushed before closing the stream.
Low-Level Socket Operations
For low-level operations such as setsockopt()
, the client object exposes the socket through a SocketProxy
:
1client.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, False)
1client.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, False)
Warning
Make sure that wait_connected()
has been called before.
socket.recv()
Buffer Size
By default, the client uses a reasonable buffer size when calling recv_packet()
.
You can control this value by setting the max_recv_size
parameter:
1with TCPNetworkClient(address, protocol, max_recv_size=1024) as client:
2 # Only do socket.recv(1024) calls
3 packet = client.recv_packet()
1async with AsyncTCPNetworkClient(address, protocol, max_recv_size=1024) as client:
2 # Only do socket.recv(1024) calls
3 packet = await client.recv_packet()
Note
max_recv_size
is also used as a size hint for buffered serializers.
See this section for details.
SSL/TLS Connection
If you want to use SSL to communicate with the remote host, the easiest way is to pass ssl=True
:
1with TCPNetworkClient(address, protocol, ssl=True) as client:
2 client.send_packet({"data": 42})
3
4 packet = client.recv_packet()
1async with AsyncTCPNetworkClient(address, protocol, ssl=True) as client:
2 await client.send_packet({"data": 42})
3
4 packet = await client.recv_packet()
Note
Since this is a client object, ssl.SSLContext.wrap_socket()
and ssl.SSLContext.wrap_bio()
are always called
with server_side=False
.
Danger
You can pass an SSLContext
instead, but at this point I expect you to really know what you are doing.
Concurrency And Multithreading
All client methods are thread-safe. Synchronization follows these rules:
send_packet()
andrecv_packet()
do not share the samethreading.Lock
instance.close()
will not wait forrecv_packet()
.The
client.socket
methods are also thread-safe. This means that you cannot access the underlying socket methods (e.g.getsockopt()
) during a write operation.
This allows you to do something like this:
1from __future__ import annotations
2
3import contextlib
4import threading
5import traceback
6from concurrent.futures import ThreadPoolExecutor
7from typing import Any, TypeAlias
8
9from easynetwork.api_sync.client import TCPNetworkClient
10from easynetwork.exceptions import StreamProtocolParseError
11from easynetwork.protocol import StreamProtocol
12from easynetwork.serializers import JSONSerializer
13
14RequestType: TypeAlias = dict[str, Any]
15ResponseType: TypeAlias = dict[str, Any]
16
17
18def consumer(response: ResponseType) -> None:
19 # Do some stuff...
20
21 print(response)
22
23
24def receiver_worker(
25 client: TCPNetworkClient[RequestType, ResponseType],
26 executor: ThreadPoolExecutor,
27) -> None:
28 while True:
29 try:
30 # Pass timeout=None to get an infinite iterator.
31 # It will be terminated when the client.close() method has been called.
32 for response in client.iter_received_packets(timeout=None):
33 executor.submit(consumer, response)
34 except StreamProtocolParseError:
35 print("Parsing error")
36 traceback.print_exc()
37 continue
38
39
40def do_main_stuff(client: TCPNetworkClient[RequestType, ResponseType]) -> None:
41 while True:
42 # Do some stuff...
43 request = {"data": 42}
44
45 client.send_packet(request)
46
47
48def main() -> None:
49 remote_address = ("localhost", 9000)
50 protocol = StreamProtocol(JSONSerializer())
51
52 with contextlib.ExitStack() as exit_stack:
53 # thread pool executor setup
54 executor = exit_stack.enter_context(ThreadPoolExecutor())
55
56 # connect to remote
57 client = TCPNetworkClient(remote_address, protocol)
58
59 # receiver_worker thread setup
60 receiver_worker_thread = threading.Thread(
61 target=receiver_worker,
62 args=(client, executor),
63 )
64 receiver_worker_thread.start()
65 exit_stack.callback(receiver_worker_thread.join)
66
67 # add the client close, so it will be closed
68 # before joining the thread
69 exit_stack.enter_context(client)
70
71 # Setup done, let's go
72 do_main_stuff(client)
73
74
75if __name__ == "__main__":
76 main()
All client methods do not require external task synchronization. Synchronization follows these rules:
send_packet()
andrecv_packet()
do not share the same lock instance.close()
will not wait forrecv_packet()
.
This allows you to do something like this:
1from __future__ import annotations
2
3import asyncio
4import contextlib
5import traceback
6from typing import Any, TypeAlias
7
8from easynetwork.api_async.client import AsyncTCPNetworkClient
9from easynetwork.exceptions import StreamProtocolParseError
10from easynetwork.protocol import StreamProtocol
11from easynetwork.serializers import JSONSerializer
12
13RequestType: TypeAlias = dict[str, Any]
14ResponseType: TypeAlias = dict[str, Any]
15
16
17async def consumer(response: ResponseType) -> None:
18 # Do some stuff...
19
20 print(response)
21
22
23async def receiver_worker(
24 client: AsyncTCPNetworkClient[RequestType, ResponseType],
25 task_group: asyncio.TaskGroup,
26) -> None:
27 while True:
28 try:
29 # Pass timeout=None to get an infinite iterator.
30 # It will be terminated when the client.close() method has been called.
31 async for response in client.iter_received_packets(timeout=None):
32 _ = task_group.create_task(consumer(response))
33 except StreamProtocolParseError:
34 print("Parsing error")
35 traceback.print_exc()
36 continue
37
38
39async def do_main_stuff(
40 client: AsyncTCPNetworkClient[RequestType, ResponseType],
41) -> None:
42 while True:
43 # Do some stuff...
44 request = {"data": 42}
45
46 await client.send_packet(request)
47
48
49async def main() -> None:
50 remote_address = ("localhost", 9000)
51 protocol = StreamProtocol(JSONSerializer())
52
53 async with contextlib.AsyncExitStack() as exit_stack:
54 # task group setup
55 task_group = await exit_stack.enter_async_context(asyncio.TaskGroup())
56
57 # connect to remote
58 client = AsyncTCPNetworkClient(remote_address, protocol)
59 await exit_stack.enter_async_context(client)
60
61 # receiver_worker task setup
62 _ = task_group.create_task(receiver_worker(client, task_group))
63
64 # Setup done, let's go
65 await do_main_stuff(client)
66
67
68if __name__ == "__main__":
69 asyncio.run(main())
SSL/TLS Considerations
For safety, concurrent calls to send_packet()
and recv_packet()
are “disabled” by default when using SSL.
In fact, they share the same synchronization lock.
If you need this feature after all, you can pass ssl_shared_lock=False
at object creation.
Danger
But you don’t need it, do you?
1client = TCPNetworkClient(
2 remote_address,
3 protocol,
4 ssl=True,
5 ssl_shared_lock=False,
6)
1client = AsyncTCPNetworkClient(
2 remote_address,
3 protocol,
4 ssl=True,
5 ssl_shared_lock=False,
6)