How-to — TCP Client Endpoints
Note
This page uses two different API variants:
Synchronous API with classic
def
functions, usable in any context.Asynchronous API with
async def
functions, using an asynchronous framework to perform I/O operations.
All asynchronous API examples assume that you are using asyncio
,
but you can use a different library thanks to the asynchronous backend engine API.
The Protocol Object
The TCP clients expect a StreamProtocol
instance to communicate with the remote endpoint.
See also
- How-to — Communication Protocols
Explains what a
StreamProtocol
is and how to use it.
Connecting To The Remote Host
You need the host address (domain name or IP) and the port of connection in order to connect to the remote host:
1from __future__ import annotations
2
3from easynetwork.clients import TCPNetworkClient
4from easynetwork.protocol import StreamProtocol
5from easynetwork.serializers import JSONSerializer
6
7
8def main() -> None:
9 protocol = StreamProtocol(JSONSerializer())
10 address = ("localhost", 9000)
11
12 with TCPNetworkClient(address, protocol) as client:
13 print(f"Remote address: {client.get_remote_address()}")
14
15 ...
16
17
18if __name__ == "__main__":
19 main()
You can control the connection timeout with the connect_timeout
parameter:
8def main() -> None:
9 protocol = StreamProtocol(JSONSerializer())
10 address = ("localhost", 9000)
11
12 try:
13 client = TCPNetworkClient(address, protocol, connect_timeout=30)
14 except TimeoutError:
15 print(f"Could not connect to {address} after 30 seconds")
16 return
17
18 with client:
19 print(f"Remote address: {client.get_remote_address()}")
20
21 ...
Note
The client does nothing when it enters the with
context. Everything is done on object creation.
1from __future__ import annotations
2
3import asyncio
4
5from easynetwork.clients import AsyncTCPNetworkClient
6from easynetwork.protocol import StreamProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = ("localhost", 9000)
13
14 async with AsyncTCPNetworkClient(address, protocol) as client:
15 print(f"Remote address: {client.get_remote_address()}")
16
17 ...
18
19
20if __name__ == "__main__":
21 asyncio.run(main())
You can control the connection timeout by adding a timeout scope using the asynchronous framework:
10async def main() -> None:
11 protocol = StreamProtocol(JSONSerializer())
12 address = ("localhost", 9000)
13
14 try:
15 async with asyncio.timeout(30):
16 client = AsyncTCPNetworkClient(address, protocol)
17 await client.wait_connected()
18 except TimeoutError:
19 print(f"Could not connect to {address} after 30 seconds")
20 return
21
22 async with client:
23 print(f"Remote address: {client.get_remote_address()}")
24
25 ...
Note
The call to wait_connected()
is required to actually initialize the client,
since we cannot perform asynchronous operations at object creation.
This is what the client does when it enters the the async with
context.
Once completed, wait_connected()
is a no-op.
Using An Already Connected Socket
If you have your own way to obtain a connected socket.socket
instance, you can pass it to the client.
If the socket is not connected, an OSError
is raised.
Important
It must be a SOCK_STREAM
socket with AF_INET
or AF_INET6
family.
Warning
The resource ownership is given to the client. You must close the client to close the socket.
1from __future__ import annotations
2
3import socket
4
5from easynetwork.clients import TCPNetworkClient
6from easynetwork.protocol import StreamProtocol
7from easynetwork.serializers import JSONSerializer
8
9
10def obtain_a_connected_socket() -> socket.socket:
11 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
12
13 ...
14
15 return sock
16
17
18def main() -> None:
19 protocol = StreamProtocol(JSONSerializer())
20 sock = obtain_a_connected_socket()
21
22 with TCPNetworkClient(sock, protocol) as client:
23 print(f"Remote address: {client.get_remote_address()}")
24
25 ...
26
27
28if __name__ == "__main__":
29 main()
1from __future__ import annotations
2
3import asyncio
4import socket
5
6from easynetwork.clients import AsyncTCPNetworkClient
7from easynetwork.protocol import StreamProtocol
8from easynetwork.serializers import JSONSerializer
9
10
11async def obtain_a_connected_socket() -> socket.socket:
12 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13
14 ...
15
16 return sock
17
18
19async def main() -> None:
20 protocol = StreamProtocol(JSONSerializer())
21 sock = await obtain_a_connected_socket()
22
23 async with AsyncTCPNetworkClient(sock, protocol) as client:
24 print(f"Remote address: {client.get_remote_address()}")
25
26 ...
27
28
29if __name__ == "__main__":
30 asyncio.run(main())
Note
Even with a ready-to-use socket, the call to wait_connected()
is still required.
Sending Packets
There’s not much to say, except that objects passed as arguments are automatically converted to bytes to send to the remote host thanks to the protocol object.
1client.send_packet({"data": 42})
1await client.send_packet({"data": 42})
Receiving Packets
You get the next available packet, already parsed. Extraneous data is kept for the next call.
1packet = client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout with the timeout
parameter:
1try:
2 packet = client.recv_packet(timeout=30)
3except TimeoutError:
4 print("Timed out")
5else:
6 print(f"Received packet: {packet!r}")
1packet = await client.recv_packet()
2print(f"Received packet: {packet!r}")
You can control the receive timeout by adding a timeout scope using the asynchronous framework:
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except TimeoutError:
5 print("Timed out")
6else:
7 print(f"Received packet: {packet!r}")
Tip
Remember to catch invalid data parsing errors.
1try:
2 packet = client.recv_packet(timeout=30)
3except StreamProtocolParseError:
4 print("Received something, but was not valid")
5except TimeoutError:
6 print("Timed out")
7else:
8 print(f"Received packet: {packet!r}")
1try:
2 async with asyncio.timeout(30):
3 packet = await client.recv_packet()
4except StreamProtocolParseError:
5 print("Received something, but was not valid")
6except TimeoutError:
7 print("Timed out")
8else:
9 print(f"Received packet: {packet!r}")
Receiving Multiple Packets At Once
You can use iter_received_packets()
to get all the received packets in a sequence or a set.
1all_packets = [p for p in client.iter_received_packets()]
1all_packets = [p async for p in client.iter_received_packets()]
The timeout
parameter defaults to zero to get only the data already in the buffer, but you can change it.
1all_packets = [p for p in client.iter_received_packets(timeout=1)]
See also
TCPNetworkClient.iter_received_packets()
The method description and usage (especially for the
timeout
parameter).
1all_packets = [p async for p in client.iter_received_packets(timeout=1)]
See also
AsyncTCPNetworkClient.iter_received_packets()
The method description and usage (especially for the
timeout
parameter).
Close The Write-End Stream
If you are sure you will never reuse send_packet()
, you can call send_eof()
to shut down the write stream.
1client.send_eof()
1await client.send_eof()
Note
send_eof()
will block until all unsent data has been flushed before closing the stream.
Low-Level Socket Operations
For low-level operations such as setsockopt()
, the client object exposes the socket through a SocketProxy
:
1client.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, False)
1client.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, False)
Warning
Make sure that wait_connected()
has been called before.
socket.recv()
Buffer Size
By default, the client uses a reasonable buffer size when calling recv_packet()
.
You can control this value by setting the max_recv_size
parameter:
1with TCPNetworkClient(address, protocol, max_recv_size=1024) as client:
2 # Only do socket.recv(1024) calls
3 packet = client.recv_packet()
1async with AsyncTCPNetworkClient(address, protocol, max_recv_size=1024) as client:
2 # Only do socket.recv(1024) calls
3 packet = await client.recv_packet()
Note
max_recv_size
is also used as a size hint for buffered serializers.
See this section for details.
SSL/TLS Connection
If you want to use SSL to communicate with the remote host, the easiest way is to pass ssl=True
:
1with TCPNetworkClient(address, protocol, ssl=True) as client:
2 client.send_packet({"data": 42})
3
4 packet = client.recv_packet()
1async with AsyncTCPNetworkClient(address, protocol, ssl=True) as client:
2 await client.send_packet({"data": 42})
3
4 packet = await client.recv_packet()
Note
Since this is a client object, ssl.SSLContext.wrap_socket()
and ssl.SSLContext.wrap_bio()
are always called
with server_side=False
.
Danger
You can pass an SSLContext
instead, but at this point I expect you to really know what you are doing.
Concurrency And Multithreading
All client methods are thread-safe. Synchronization follows these rules:
send_packet()
andrecv_packet()
do not share the samethreading.Lock
instance.close()
will not wait forrecv_packet()
.The
client.socket
methods are also thread-safe. This means that you cannot access the underlying socket methods (e.g.getsockopt()
) during a write operation.
All client methods do not require external task synchronization (such as asyncio.Lock
).
Synchronization follows these rules:
send_packet()
andrecv_packet()
do not share the same lock instance.aclose()
will not wait forrecv_packet()
.
SSL/TLS Concurrency Considerations
For safety, concurrent calls to send_packet()
and recv_packet()
are “disabled” by default when using SSL.
In fact, they share the same synchronization lock.
If you need this feature after all, you can pass ssl_shared_lock=False
at object creation.
Danger
But you don’t need it, do you?
1client = TCPNetworkClient(
2 remote_address,
3 protocol,
4 ssl=True,
5 ssl_shared_lock=False,
6)
1client = AsyncTCPNetworkClient(
2 remote_address,
3 protocol,
4 ssl=True,
5 ssl_shared_lock=False,
6)