Data Transport Adapters

Low-level asynchronous transports module.


Abstract Base Classes

Low-level asynchronous transports interfaces module.

class easynetwork.lowlevel.api_async.transports.abc.AsyncBaseTransport

Bases: TypedAttributeProvider

Base class for an asynchronous data transport.

async __aexit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None

Calls aclose().

abstractmethod async aclose() None

Closes the transport.

Warning

aclose() performs a graceful close, waiting for the transport to close.

If aclose() is cancelled, the transport is closed abruptly.

abstractmethod is_closing() bool

Checks if the transport is closed or in the process of being closed.

Returns:

True if the transport is closing.

Return type:

bool

abstractmethod backend() AsyncBackend
Returns:

The backend implementation linked to this transport.

Return type:

AsyncBackend

class easynetwork.lowlevel.api_async.transports.abc.AsyncDatagramListener

Bases: AsyncBaseTransport, Generic[_T_Address]

An interface specialized for objects that let you handle incoming datagrams from anywhere.

abstractmethod async serve(handler: Callable[[bytes, _T_Address], Coroutine[Any, Any, None]], task_group: TaskGroup | None = None) NoReturn

Receive incoming datagrams as they come in and start tasks to handle them.

Important

The implementation must ensure that datagrams are processed in the order in which they are received.

Parameters:
  • handler (Callable[[bytes, _T_Address], Coroutine[Any, Any, None]]) – a callable that will be used to handle each received datagram.

  • task_group (TaskGroup | None) – the task group that will be used to start tasks for handling each received datagram.

Return type:

NoReturn

async serve_with_ancillary(handler: Callable[[bytes, Any | None, _T_Address], Coroutine[Any, Any, None]], ancillary_bufsize: int, task_group: TaskGroup | None = None) NoReturn

Receive incoming datagrams with ancillary data as they come in and start tasks to handle them.

Added in version 1.2.

Important

The implementation must ensure that datagrams are processed in the order in which they are received.

Parameters:
  • handler (Callable[[bytes, Any | None, _T_Address], Coroutine[Any, Any, None]]) – a callable that will be used to handle each received datagram. The ancillary data can be None if there is none.

  • ancillary_bufsize (int) – the maximum buffer size for ancillary data.

  • task_group (TaskGroup | None) – the task group that will be used to start tasks for handling each received datagram.

Raises:

UnsupportedOperation – This transport does not have ancillary data support.

Return type:

NoReturn

abstractmethod async send_to(data: bytes | bytearray | memoryview, address: _T_Address) None

Send the data bytes to the remote peer address.

Important

This method should be safe to call from multiple tasks.

Parameters:
Raises:

OSError – Data too big to be sent at once.

async send_with_ancillary_to(data: bytes | bytearray | memoryview, ancillary_data: Any, address: _T_Address) None

Send the data bytes to the remote peer address with ancillary data.

Important

This method should be safe to call from multiple tasks.

Parameters:
  • data (bytes | bytearray | memoryview) – the bytes to send.

  • ancillary_data (Any) – The ancillary data to send along with the message.

  • address (_T_Address) – the remote peer.

Raises:
class easynetwork.lowlevel.api_async.transports.abc.AsyncDatagramReadTransport

Bases: AsyncBaseTransport

An asynchronous reader transport of unreliable packets of data.

abstractmethod async recv() bytes

Read and return the next available packet.

Returns:

some bytes.

Return type:

bytes

async recv_with_ancillary(ancillary_bufsize: int) tuple[bytes, Any]

Read and return the next available packet with ancillary data.

Added in version 1.2.

Parameters:

ancillary_bufsize (int) – the maximum buffer size for ancillary data.

Raises:
Returns:

a tuple with some bytes and the ancillary data.

Return type:

tuple[bytes, Any]

class easynetwork.lowlevel.api_async.transports.abc.AsyncDatagramTransport

Bases: AsyncDatagramWriteTransport, AsyncDatagramReadTransport

An asynchronous transport of unreliable packets of data.

class easynetwork.lowlevel.api_async.transports.abc.AsyncDatagramWriteTransport

Bases: AsyncBaseTransport

An asynchronous writer transport of unreliable packets of data.

abstractmethod async send(data: bytes | bytearray | memoryview) None

Send the data bytes to the remote peer.

Parameters:

data (bytes | bytearray | memoryview) – the bytes to send.

Raises:

OSError – Data too big to be sent at once.

async send_with_ancillary(data: bytes | bytearray | memoryview, ancillary_data: Any) None

Send the data bytes to the remote peer with ancillary data.

Added in version 1.2.

Parameters:
  • data (bytes | bytearray | memoryview) – the bytes to send.

  • ancillary_data (Any) – The ancillary data to send along with the message.

Raises:
class easynetwork.lowlevel.api_async.transports.abc.AsyncListener

Bases: AsyncBaseTransport, Generic[_T_co]

An interface for objects that let you accept incoming connections.

abstractmethod async serve(handler: Callable[[_T_co], Coroutine[Any, Any, None]], task_group: TaskGroup | None = None) NoReturn

Accept incoming connections as they come in and start tasks to handle them.

Parameters:
  • handler (Callable[[_T_co], Coroutine[Any, Any, None]]) – a callable that will be used to handle each accepted connection.

  • task_group (TaskGroup | None) – the task group that will be used to start tasks for handling each accepted connection.

Return type:

NoReturn

class easynetwork.lowlevel.api_async.transports.abc.AsyncStreamReadTransport

Bases: AsyncBaseTransport

An asynchronous continuous stream data reader transport.

async recv(bufsize: int) bytes

Read and return up to bufsize bytes.

Parameters:

bufsize (int) – the maximum buffer size.

Raises:

ValueError – Negative bufsize.

Returns:

some bytes.

If bufsize is greater than zero and an empty byte buffer is returned, this indicates an EOF.

Return type:

bytes

abstractmethod async recv_into(buffer: bytearray | memoryview | collections.abc.Buffer) int

Read into the given buffer.

Parameters:

buffer (bytearray | memoryview | collections.abc.Buffer) – where to write the received bytes.

Returns:

the number of bytes written.

Returning 0 for a non-zero buffer indicates an EOF.

Return type:

int

async recv_with_ancillary(bufsize: int, ancillary_bufsize: int) tuple[bytes, Any]

Read and return up to bufsize bytes with ancillary data.

Added in version 1.2.

Parameters:
  • bufsize (int) – the maximum buffer size.

  • ancillary_bufsize (int) – the maximum buffer size for ancillary data.

Raises:
Returns:

a tuple with some bytes and the ancillary data.

If bufsize is greater than zero and an empty byte buffer is returned, this indicates an EOF.

Return type:

tuple[bytes, Any]

async recv_with_ancillary_into(buffer: bytearray | memoryview | collections.abc.Buffer, ancillary_bufsize: int) tuple[int, Any]

Read into the given buffer with ancillary data.

Added in version 1.2.

Parameters:
Raises:
Returns:

a tuple with the number of bytes written and the ancillary data.

Returning 0 for a non-zero buffer indicates an EOF.

Return type:

tuple[int, Any]

class easynetwork.lowlevel.api_async.transports.abc.AsyncStreamTransport

Bases: AsyncStreamWriteTransport, AsyncStreamReadTransport

An asynchronous continuous stream data transport.

abstractmethod async send_eof() None

Closes the write end of the stream after the buffered write data is flushed.

class easynetwork.lowlevel.api_async.transports.abc.AsyncStreamWriteTransport

Bases: AsyncBaseTransport

An asynchronous continuous stream data writer transport.

abstractmethod async send_all(data: bytes | bytearray | memoryview) None

Send the data bytes to the remote peer.

Parameters:

data (bytes | bytearray | memoryview) – the bytes to send.

async send_all_from_iterable(iterable_of_data: Iterable[bytes | bytearray | memoryview]) None

An efficient way to send a bunch of data via the transport.

Like send_all(), this method continues to send data from bytes until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully sent.

Parameters:

iterable_of_data (Iterable[bytes | bytearray | memoryview]) – An iterable yielding the bytes to send.

async send_all_with_ancillary(iterable_of_data: Iterable[bytes | bytearray | memoryview], ancillary_data: Any) None

An efficient way to send a bunch of data via the transport with ancillary data.

Unlike send_all() and send_all_from_iterable(), this method tries to send all data at once. If not all could be sent, an exception is raised. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully sent.

Added in version 1.2.

Parameters:
Raises:

SSL/TLS Support

Low-level asynchronous SSL transports module.

To use this module, the standard ssl module must be available.

class easynetwork.lowlevel.api_async.transports.tls.AsyncTLSListener

Bases: AsyncListener[AsyncTLSStreamTransport]

Listener with SSL/TLS wrapper for a continuous stream transport.

__init__(listener: AsyncListener[AsyncStreamTransport], ssl_context: ssl.SSLContext, *, handshake_timeout: float | None = None, shutdown_timeout: float | None = None, standard_compatible: bool = True, handshake_error_handler: Callable[[Exception], None] | None = None) None

Changed in version 1.1: Added handshake_error_handler parameter.

Parameters:
  • listener (AsyncListener[AsyncStreamTransport]) – The listener to wrap.

  • ssl_context (ssl.SSLContext) – a ssl.SSLContext object to use to create the client transport.

  • handshake_timeout (float | None) – The time in seconds to wait for the TLS handshake to complete before aborting the connection. 60.0 seconds if None (default).

  • shutdown_timeout (float | None) – The time in seconds to wait for the SSL shutdown to complete before aborting the connection. 30.0 seconds if None (default).

  • standard_compatible (bool) – If False, skip the closing handshake when closing the connection, and don’t raise an exception if the peer does the same.

  • handshake_error_handler (Callable[[Exception], None] | None) – a callback to be used on handshake failure. By default, emits a warning log.

is_closing() bool

Checks if the transport is closed or in the process of being closed.

Returns:

True if the transport is closing.

Return type:

bool

async aclose() None

Closes the transport.

Warning

aclose() performs a graceful close, waiting for the transport to close.

If aclose() is cancelled, the transport is closed abruptly.

async serve(handler: Callable[[AsyncTLSStreamTransport], Coroutine[Any, Any, None]], task_group: TaskGroup | None = None) NoReturn

Accept incoming connections as they come in and start tasks to handle them.

Parameters:
Return type:

NoReturn

backend() AsyncBackend
Returns:

The backend implementation linked to this transport.

Return type:

AsyncBackend

property extra_attributes: Mapping[Any, Callable[[], Any]]

A mapping of the extra attributes to callables that return the corresponding values.

If the provider wraps another provider, the attributes from that wrapper should also be included in the returned mapping (but the wrapper may override the callables from the wrapped instance).

The callables should raise TypedAttributeLookupError if it is not possible to get the value.

class easynetwork.lowlevel.api_async.transports.tls.AsyncTLSStreamTransport

Bases: AsyncStreamTransport

SSL/TLS wrapper for a continuous stream transport.

async classmethod wrap(transport: AsyncStreamTransport, ssl_context: SSLContext, *, handshake_timeout: float | None = None, shutdown_timeout: float | None = None, server_side: bool | None = None, server_hostname: str | None = None, standard_compatible: bool = True, session: SSLSession | None = None) Self
Parameters:
  • transport (AsyncStreamTransport) – The transport to wrap.

  • ssl_context (SSLContext) – a ssl.SSLContext object to use to create the transport.

  • handshake_timeout (float | None) – The time in seconds to wait for the TLS handshake to complete before aborting the connection. 60.0 seconds if None (default).

  • shutdown_timeout (float | None) – The time in seconds to wait for the SSL shutdown to complete before aborting the connection. 30.0 seconds if None (default).

  • server_side (bool | None) – Indicates whether we are a client or a server for the handshake part. If it is set to None, it is deduced according to server_hostname.

  • server_hostname (str | None) – sets or overrides the hostname that the target server’s certificate will be matched against. If server_side is True, you must pass a value for server_hostname.

  • standard_compatible (bool) – If False, skip the closing handshake when closing the connection, and don’t raise an exception if the peer does the same.

  • session (SSLSession | None) – If an SSL session already exits, use it insead.

Return type:

Self

is_closing() bool

Checks if the transport is closed or in the process of being closed.

Returns:

True if the transport is closing.

Return type:

bool

async aclose() None

Closes the transport.

Warning

aclose() performs a graceful close, waiting for the transport to close.

If aclose() is cancelled, the transport is closed abruptly.

backend() AsyncBackend
Returns:

The backend implementation linked to this transport.

Return type:

AsyncBackend

async recv(bufsize: int) bytes

Read and return up to bufsize bytes.

Parameters:

bufsize (int) – the maximum buffer size.

Raises:

ValueError – Negative bufsize.

Returns:

some bytes.

If bufsize is greater than zero and an empty byte buffer is returned, this indicates an EOF.

Return type:

bytes

async recv_into(buffer: bytearray | memoryview | collections.abc.Buffer) int

Read into the given buffer.

Parameters:

buffer (bytearray | memoryview | collections.abc.Buffer) – where to write the received bytes.

Returns:

the number of bytes written.

Returning 0 for a non-zero buffer indicates an EOF.

Return type:

int

async send_all(data: bytes | bytearray | memoryview) None

Send the data bytes to the remote peer.

Parameters:

data (bytes | bytearray | memoryview) – the bytes to send.

async send_all_from_iterable(iterable_of_data: Iterable[bytes | bytearray | memoryview]) None

An efficient way to send a bunch of data via the transport.

Like send_all(), this method continues to send data from bytes until either all data has been sent or an error occurs. None is returned on success. On error, an exception is raised, and there is no way to determine how much data, if any, was successfully sent.

Parameters:

iterable_of_data (Iterable[bytes | bytearray | memoryview]) – An iterable yielding the bytes to send.

async send_eof() None

Closes the write end of the stream after the buffered write data is flushed.

Raises:

UnsupportedOperation – SSL/TLS API does not support sending EOF (for now).

property extra_attributes: Mapping[Any, Callable[[], Any]]

A mapping of the extra attributes to callables that return the corresponding values.

If the provider wraps another provider, the attributes from that wrapper should also be included in the returned mapping (but the wrapper may override the callables from the wrapped instance).

The callables should raise TypedAttributeLookupError if it is not possible to get the value.

Composite Data Transports

Low-level asynchronous transport composite module.

Added in version 1.1.

final class easynetwork.lowlevel.api_async.transports.composite.AsyncStapledDatagramTransport

Bases: AsyncDatagramTransport, Generic[_T_SendDatagramTransport, _T_ReceiveDatagramTransport]

An asynchronous transport of unreliable packets of data that merges two transports.

Extra attributes will be provided from both transports, with the receive stream providing the values in case of a conflict.

Added in version 1.1.

send_transport: _T_SendDatagramTransport

The write part of the transport.

receive_transport: _T_ReceiveDatagramTransport

The read part of the transport.

async aclose() None

Closes both transports.

Warning

aclose() performs a graceful close, waiting for the transports to close.

If aclose() is cancelled, the transports are closed using aclose_forcefully().

is_closing() bool

Checks if both the transports are closed or in the process of being closed.

Returns:

True if the transports are closing.

Return type:

bool

async recv() bytes

Calls self.receive_transport.recv().

Return type:

bytes

async recv_with_ancillary(ancillary_bufsize: int) tuple[bytes, Any]

Calls self.receive_transport.recv_with_ancillary().

Added in version 1.2.

Return type:

tuple[bytes, Any]

async send(data: bytes | bytearray | memoryview) None

Calls self.send_transport.send().

async send_with_ancillary(data: bytes | bytearray | memoryview, ancillary_data: Any) None

Calls self.send_transport.send_with_ancillary().

Added in version 1.2.

backend() AsyncBackend
Returns:

The backend implementation linked to this transport.

Return type:

AsyncBackend

property extra_attributes: Mapping[Any, Callable[[], Any]]

A mapping of the extra attributes to callables that return the corresponding values.

If the provider wraps another provider, the attributes from that wrapper should also be included in the returned mapping (but the wrapper may override the callables from the wrapped instance).

The callables should raise TypedAttributeLookupError if it is not possible to get the value.

final class easynetwork.lowlevel.api_async.transports.composite.AsyncStapledStreamTransport

Bases: AsyncStreamTransport, Generic[_T_SendStreamTransport, _T_ReceiveStreamTransport]

An asynchronous continous stream data transport that merges two transports.

Extra attributes will be provided from both transports, with the receive stream providing the values in case of a conflict.

Added in version 1.1.

send_transport: _T_SendStreamTransport

The write part of the transport.

receive_transport: _T_ReceiveStreamTransport

The read part of the transport.

async aclose() None

Closes both transports.

Warning

aclose() performs a graceful close, waiting for the transports to close.

If aclose() is cancelled, the transports are closed using aclose_forcefully().

is_closing() bool

Checks if both the transports are closed or in the process of being closed.

Returns:

True if the transports are closing.

Return type:

bool

async recv(bufsize: int) bytes

Calls self.receive_transport.recv().

Return type:

bytes

async recv_into(buffer: bytearray | memoryview | collections.abc.Buffer) int

Calls self.receive_transport.recv_into().

Return type:

int

async recv_with_ancillary(bufsize: int, ancillary_bufsize: int) tuple[bytes, Any]

Calls self.receive_transport.recv_with_ancillary().

Added in version 1.2.

Return type:

tuple[bytes, Any]

async recv_with_ancillary_into(buffer: bytearray | memoryview | collections.abc.Buffer, ancillary_bufsize: int) tuple[int, Any]

Calls self.receive_transport.recv_with_ancillary_into().

Added in version 1.2.

Return type:

tuple[int, Any]

async send_all(data: bytes | bytearray | memoryview) None

Calls self.send_transport.send_all().

async send_all_from_iterable(iterable_of_data: Iterable[bytes | bytearray | memoryview]) None

Calls self.send_transport.send_all_from_iterable().

async send_all_with_ancillary(iterable_of_data: Iterable[bytes | bytearray | memoryview], ancillary_data: Any) None

Calls self.send_transport.send_all_with_ancillary().

Added in version 1.2.

async send_eof() None

Closes the write end of the stream after the buffered write data is flushed.

If self.send_transport.send_eof() then this calls it. Otherwise, this calls self.send_transport.aclose().

Note

This method handles the case where self.send_transport.send_eof() raises NotImplementedError or UnsupportedOperation; self.send_transport.aclose() will be called as a fallback.

backend() AsyncBackend
Returns:

The backend implementation linked to this transport.

Return type:

AsyncBackend

property extra_attributes: Mapping[Any, Callable[[], Any]]

A mapping of the extra attributes to callables that return the corresponding values.

If the provider wraps another provider, the attributes from that wrapper should also be included in the returned mapping (but the wrapper may override the callables from the wrapped instance).

The callables should raise TypedAttributeLookupError if it is not possible to get the value.

easynetwork.lowlevel.api_async.transports.composite._T_SendStreamTransport

Type:    TypeVar

Invariant TypeVar bound to easynetwork.lowlevel.api_async.transports.abc.AsyncStreamWriteTransport.

easynetwork.lowlevel.api_async.transports.composite._T_ReceiveStreamTransport

Type:    TypeVar

Invariant TypeVar bound to easynetwork.lowlevel.api_async.transports.abc.AsyncStreamReadTransport.

easynetwork.lowlevel.api_async.transports.composite._T_SendDatagramTransport

Type:    TypeVar

Invariant TypeVar bound to easynetwork.lowlevel.api_async.transports.abc.AsyncDatagramWriteTransport.

easynetwork.lowlevel.api_async.transports.composite._T_ReceiveDatagramTransport

Type:    TypeVar

Invariant TypeVar bound to easynetwork.lowlevel.api_async.transports.abc.AsyncDatagramReadTransport.

Miscellaneous

Low-level asynchronous transports tools module.

async easynetwork.lowlevel.api_async.transports.utils.aclose_forcefully(transport: _TransportLike) None

Close an async transport immediately, without blocking to do any graceful cleanup.

Changed in version 1.1: transport can now be any closeable object with a backend() method.

Parameters:

transport (_TransportLike) – the transport to close.

protocol easynetwork.lowlevel.api_async.transports.utils._TransportLike

Bases: Protocol

Classes that implement this protocol must have the following methods / attributes:

abstractmethod aclose() Awaitable[None]

Closes the transport.

Warning

aclose() performs a graceful close, waiting for the transport to close.

If aclose() is cancelled, the transport is closed abruptly.

Return type:

Awaitable[None]

abstractmethod backend() AsyncBackend
Returns:

The backend implementation linked to this transport.

Return type:

AsyncBackend