Asynchronous Backend Engine API

Asynchronous backend engine module.


Introduction

In order not to depend on a single implementation of asynchronous operations (e.g. asyncio), here is a mini-framework to manage different implementations and keep EasyNetwork unaware of the library used.

Why not just use anyio directly?

Short answer: Because I don’t want to.

Click here to expand/collapse the long answer

The main problem with anyio is the simple fact that it is a framework that already encapsulates the sockets without providing a way to manipulate the underlying transport, and this is normal; it would be a horror to maintain.

But as a result, the high-level API does not expose the features I need, such as:

The second problem is having anyio as a dependency. asyncio is part of the standard library, so why would I use an external (and large) project to manage asyncio and make it mandatory? Also, it would be heavier to write asyncio-only code if anyio is not installed.

Usage

Use The Interface Provided By The High-level API

All asynchronous objects relying on an AsyncBackend object have a backend() method:

Obtain An Object By Yourself

You can use new_builtin_backend() to have a backend instance:

>>> from easynetwork.lowlevel.api_async.backend.utils import new_builtin_backend
>>> new_builtin_backend("asyncio")
<AsyncIOBackend object at ...>
>>> new_builtin_backend("trio")
<TrioBackend object at ...>

You can also let sniffio determine which backend should be used via ensure_backend():

>>> from easynetwork.lowlevel.api_async.backend.utils import ensure_backend
>>> import asyncio, trio
>>>
>>> async def main():
...     return ensure_backend(None)
...
>>> asyncio.run(main())
<AsyncIOBackend object at ...>
>>> trio.run(main)
<TrioBackend object at ...>

Backend Interface

Asynchronous backend engine interfaces module.

class easynetwork.lowlevel.api_async.backend.abc.AsyncBackend

Bases: object

Asynchronous backend interface.

It bridges the gap between asynchronous frameworks (asyncio, trio, or whatever) and EasyNetwork.

Runners

abstractmethod AsyncBackend.bootstrap(coro_func: Callable[[Unpack[_T_PosArgs]], Coroutine[Any, Any, _T]], *args: Unpack[_T_PosArgs], runner_options: Mapping[str, Any] | None = ...) _T

Runs an async function, and returns the result.

Calling:

backend.bootstrap(coro_func, *args)

is equivalent to:

await coro_func(*args)

except that bootstrap() can (and must) be called from a synchronous context.

runner_options can be used to give additional parameters to the backend runner. For example:

backend.bootstrap(coro_func, *args, runner_options={"loop_factory": uvloop.new_event_loop})

would act as the following for asyncio:

with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
    runner.run(coro_func(*args))
Parameters:
Returns:

Whatever await coro_func(*args) returns.

Return type:

_T

Coroutines And Tasks

Sleeping

abstractmethod async AsyncBackend.coro_yield() None

Explicitly introduce a breakpoint to suspend a task.

This checks for cancellation and allows other tasks to be scheduled, without otherwise blocking.

Note

The scheduler has the option of ignoring this and continuing to run the current task if it decides this is appropriate (e.g. for increased efficiency).

abstractmethod async AsyncBackend.sleep(delay: float) None

Pause execution of the current task for the given number of seconds.

Parameters:

delay (float) – The number of seconds to sleep. May be zero to insert a checkpoint without actually blocking.

Raises:

ValueError – if delay is negative or NaN.

abstractmethod async AsyncBackend.sleep_forever() NoReturn

Pause execution of the current task forever (or at least until cancelled).

Equivalent to (but probably more efficient than):

await backend.sleep(math.inf)
Return type:

NoReturn

async AsyncBackend.sleep_until(deadline: float) None

Pause execution of the current task until the given time.

The difference between sleep() and sleep_until() is that the former takes a relative time and the latter takes an absolute time (as returned by current_time()).

Parameters:

deadline (float) – The time at which we should wake up again. May be in the past, in which case this function executes a checkpoint but does not block.

abstractmethod AsyncBackend.current_time() float

Returns the current time according to the scheduler clock.

Returns:

The current time.

Return type:

float

Task Cancellation

abstractmethod AsyncBackend.get_cancelled_exc_class() type[BaseException]

Returns the current async library’s cancellation exception class.

Returns:

An exception class.

Return type:

type[BaseException]

Shielding From Task Cancellation
abstractmethod async AsyncBackend.cancel_shielded_coro_yield() None

Introduce a schedule point, but not a cancel point.

Equivalent to (but probably more efficient than):

await backend.ignore_cancellation(backend.coro_yield())
abstractmethod async AsyncBackend.ignore_cancellation(coroutine: Awaitable[_T_co]) _T_co

Protect a coroutine from being cancelled.

The statement:

res = await backend.ignore_cancellation(something())

is equivalent to:

res = await something()

except that if the coroutine containing it is cancelled, the task running in something() is not cancelled.

Return type:

_T_co

Creating Concurrent Tasks

async AsyncBackend.gather(*coroutines: Awaitable[_T_co]) list[_T_co]

Run awaitable objects in the coroutines sequence concurrently.

Parameters:

coroutines (Awaitable[_T_co]) – any awaitable object.

Returns:

If all awaitables are completed successfully, the result is an aggregate list of returned values. The order of result values corresponds to the order of awaitables in coroutines.

Raises:

ExceptionGroup – If one or more awaitable(s) fails.

Return type:

list[_T_co]

abstractmethod AsyncBackend.create_task_group() TaskGroup

Creates a task group.

The most common use is as an asynchronous context manager:

async with backend.create_task_group() as task_group:
    ...
Returns:

A new task group.

Return type:

TaskGroup

class easynetwork.lowlevel.api_async.backend.abc.TaskGroup

Bases: object

Groups several asynchronous tasks together.

Example:

async def main():
    async with backend.create_task_group() as tg:
        tg.start_soon(some_coro)
        tg.start_soon(another_coro)
    print("Both tasks have completed now.")

The async with statement will wait for all tasks in the group to finish. While waiting, new tasks may still be added to the group (for example, by passing tg into one of the coroutines and calling tg.start_soon() in that coroutine). Once the last task has finished and the async with block is exited, no new tasks may be added to the group.

abstractmethod start_soon(coro_func: Callable[[Unpack[_T_PosArgs]], Coroutine[Any, Any, _T]], /, *args: Unpack[_T_PosArgs], name: str | None = ...) None

Schedules the creation of a new task in this task group.

Parameters:
  • coro_func (Callable[[Unpack[_T_PosArgs]], Coroutine[Any, Any, _T]]) – An async function.

  • args (Unpack[_T_PosArgs]) – Positional arguments to be passed to coro_func. If you need to pass keyword arguments, then use functools.partial().

  • name (str | None) – Name of the task, for the purposes of introspection and debugging.

abstractmethod async start(coro_func: Callable[[Unpack[_T_PosArgs]], Coroutine[Any, Any, _T]], /, *args: Unpack[_T_PosArgs], name: str | None = ...) Task[_T]

Starts a new managed task in this task group. Blocks until the event loop starts the task.

Warning

Unlike trio and anyio, there is no task_status parameter.

Parameters:
  • coro_func (Callable[[Unpack[_T_PosArgs]], Coroutine[Any, Any, _T]]) – An async function.

  • args (Unpack[_T_PosArgs]) – Positional arguments to be passed to coro_func. If you need to pass keyword arguments, then use functools.partial().

  • name (str | None) – Name of the task, for the purposes of introspection and debugging.

Returns:

the created task.

Return type:

Task[_T]

class easynetwork.lowlevel.api_async.backend.abc.Task

Bases: Generic[_T_co]

A Task object represents a concurrent “thread” of execution.

abstract property info: TaskInfo

The task data. Read-only attribute.

abstractmethod done() bool

Returns the Task state.

A Task is done when the wrapped coroutine either returned a value, raised an exception, or the Task was cancelled.

Returns:

True if the Task is done.

Return type:

bool

abstractmethod cancel() bool

Request the Task to be cancelled.

This arranges for a backend.get_cancelled_exc_class() exception to be thrown into the wrapped coroutine on the next cycle of the event loop.

Task.cancel() does not guarantee that the Task will be cancelled, although suppressing cancellation completely is not common and is actively discouraged.

Returns:

True if the cancellation request have been taken into account. False if the task is already done.

Return type:

bool

abstractmethod cancelled() bool

Returns the cancellation state.

The Task is cancelled when the cancellation was requested with cancel() and the wrapped coroutine propagated the backend.get_cancelled_exc_class() exception thrown into it.

Returns:

True if the Task is cancelled

Return type:

bool

abstractmethod async wait() None

Blocks until the task has been completed, but does not unwrap the result.

See the join() method to get the actual task state.

Important

Cancelling Task.wait() does not cancel the task.

abstractmethod async join() _T_co

Blocks until the task has been completed, and returns the result.

Important

Cancelling Task.join() does not cancel the task.

Raises:
  • backend.get_cancelled_exc_class() – The task was cancelled.

  • BaseException – Any exception raised by the task.

Returns:

the task result.

Return type:

_T_co

abstractmethod async join_or_cancel() _T_co

Similar to Task.join() except that if the coroutine is cancelled, the cancellation is propagated to this task.

Roughly equivalent to:

try:
    await task.wait()
except backend.get_cancelled_exc_class():
    task.cancel()
    await backend.ignore_cancellation(task.wait())
    if task.cancelled():
        raise
assert task.done()
return await task.join()
Return type:

_T_co

Introspection

abstractmethod AsyncBackend.get_current_task() TaskInfo

Return the current task.

Returns:

a representation of the current task.

Return type:

TaskInfo

class easynetwork.lowlevel.api_async.backend.abc.TaskInfo

Bases: object

Represents an asynchronous task.

id: int

The unique identifier of the task

name: str

The description of the task (if any)

coro: Coroutine[Any, Any, Any] | None

The coroutine object of the task

Timeouts

abstractmethod AsyncBackend.open_cancel_scope(*, deadline: float = ...) CancelScope

Open a new cancel scope. See move_on_after() for details.

Parameters:

deadline (float) – absolute time to stop waiting. Defaults to math.inf.

Returns:

a new cancel scope.

Return type:

CancelScope

AsyncBackend.move_on_after(delay: float) CancelScope

Returns a new CancelScope that can be used to limit the amount of time spent waiting on something. The deadline is set to now + delay.

Example:

async def long_running_operation(backend):
    await backend.sleep(3600)  # 1 hour

async def main():
    ...

    with backend.move_on_after(10):
        await long_running_operation(backend)

    print("After at most 10 seconds.")

If long_running_operation takes more than 10 seconds to complete, the context manager will cancel the current task and handle the resulting backend.get_cancelled_exc_class() exception internally.

Parameters:

delay (float) – number of seconds to wait. If delay is math.inf, no time limit will be applied; this can be useful if the delay is unknown when the context manager is created. In either case, the context manager can be rescheduled after creation using CancelScope.reschedule().

Returns:

a new cancel scope.

Return type:

CancelScope

AsyncBackend.move_on_at(deadline: float) CancelScope

Similar to move_on_after(), except deadline is the absolute time to stop waiting, or math.inf.

Example:

async def long_running_operation(backend):
    await backend.sleep(3600)  # 1 hour

async def main():
    ...

    deadline = backend.current_time() + 10
    with backend.move_on_at(deadline):
        await long_running_operation(backend)

    print("After at most 10 seconds.")
Parameters:

deadline (float) – absolute time to stop waiting.

Returns:

a new cancel scope.

Return type:

CancelScope

AsyncBackend.timeout(delay: float) AbstractContextManager[CancelScope, None]

Returns a context manager that can be used to limit the amount of time spent waiting on something.

This function and move_on_after() are similar in that both create a context manager with a given timeout, and if the timeout expires then both will cause backend.get_cancelled_exc_class() to be raised within the scope. The difference is that when the exception reaches move_on_after(), it is caught and discarded. When it reaches timeout(), then it is caught and TimeoutError is raised in its place.

Parameters:

delay (float) – number of seconds to wait.

Returns:

a context manager

Return type:

AbstractContextManager[CancelScope, None]

AsyncBackend.timeout_at(deadline: float) AbstractContextManager[CancelScope, None]

Returns a context manager that can be used to limit the amount of time spent waiting on something.

This function and move_on_at() are similar in that both create a context manager with a given timeout, and if the timeout expires then both will cause backend.get_cancelled_exc_class() to be raised within the scope. The difference is that when the exception reaches move_on_at(), it is caught and discarded. When it reaches timeout_at(), then it is caught and TimeoutError is raised in its place.

Parameters:

deadline (float) – absolute time to stop waiting.

Returns:

a context manager

Return type:

AbstractContextManager[CancelScope, None]

class easynetwork.lowlevel.api_async.backend.abc.CancelScope

Bases: object

A temporary scope opened by a task that can be used by other tasks to control its execution time.

Warning

Unlike trio’s CancelScope, there is no “shielded” scopes; you must use AsyncBackend.ignore_cancellation().

abstractmethod cancel() None

Request the Task to be cancelled.

This arranges for a backend.get_cancelled_exc_class() exception to be thrown into the wrapped coroutine on the next cycle of the event loop.

CancelScope.cancel() does not guarantee that the Task will be cancelled, although suppressing cancellation completely is not common and is actively discouraged.

abstractmethod cancel_called() bool

Checks if cancel() has been called.

Returns:

True if cancel() has been called.

Return type:

bool

abstractmethod cancelled_caught() bool

Returns the scope cancellation state.

Returns:

True if the scope has been cancelled.

Return type:

bool

abstractmethod when() float

Returns the current deadline.

Returns:

the absolute time in seconds. math.inf if the current deadline is not set.

Return type:

float

abstractmethod reschedule(when: float, /) None

Reschedules the timeout.

Parameters:

when (float) – The new deadline.

property deadline: float

A read-write attribute to simplify the timeout management.

For example, this statement:

scope.deadline += 30

is equivalent to:

scope.reschedule(scope.when() + 30)

It is also possible to remove the timeout by “deleting” the attribute:

del scope.deadline

Networking

DNS

async AsyncBackend.getaddrinfo(host: bytes | str | None, port: bytes | str | int | None, family: int = 0, type: int = 0, proto: int = 0, flags: int = 0) Sequence[tuple[int, int, int, str, tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes]]]

Asynchronous version of socket.getaddrinfo().

Return type:

Sequence[tuple[int, int, int, str, tuple[str, int] | tuple[str, int, int, int] | tuple[int, bytes]]]

async AsyncBackend.getnameinfo(sockaddr: tuple[str, int] | tuple[str, int, int, int], flags: int = 0) tuple[str, str]

Asynchronous version of socket.getnameinfo().

Return type:

tuple[str, str]

Opening Network Connections

abstractmethod async AsyncBackend.create_tcp_connection(host: str, port: int, *, local_address: tuple[str, int] | None = ..., happy_eyeballs_delay: float | None = ...) AsyncStreamTransport

Opens a connection using the TCP/IP protocol.

Parameters:
  • host (str) – The host IP/domain name.

  • port (int) – Port of connection.

  • local_address (tuple[str, int] | None) – If given, is a (local_host, local_port) tuple used to bind the socket locally.

  • happy_eyeballs_delay (float | None) – If given, is the “Connection Attempt Delay” as defined in RFC 8305.

Raises:
Returns:

A stream socket.

Return type:

AsyncStreamTransport

async AsyncBackend.create_unix_stream_connection(path: str | bytes, *, local_path: str | bytes | None = None) AsyncStreamTransport

Opens a connection to a Unix stream socket.

Added in version 1.1.

Parameters:
  • path (str | bytes) – Path of the socket to which the connection is made.

  • local_path (str | bytes | None) – If given, is a Unix socket address used to bind the socket locally.

Raises:
Returns:

A stream socket.

Return type:

AsyncStreamTransport

abstractmethod async AsyncBackend.wrap_stream_socket(socket: socket.socket) AsyncStreamTransport

Wraps an already connected SOCK_STREAM socket into an asynchronous stream socket.

Important

The returned stream socket takes the ownership of socket.

You should use AsyncStreamTransport.aclose() to close the socket.

Parameters:

socket (socket.socket) – The socket to wrap.

Raises:

ValueError – Invalid socket type or family.

Returns:

A stream socket.

Return type:

AsyncStreamTransport

abstractmethod async AsyncBackend.create_udp_endpoint(remote_host: str, remote_port: int, *, local_address: tuple[str, int] | None = ..., family: int = ...) AsyncDatagramTransport

Opens an endpoint using the UDP/IP protocol.

Parameters:
  • remote_host (str) – The host IP/domain name.

  • remote_port (int) – Port of connection.

  • local_address (tuple[str, int] | None) – If given, is a (local_host, local_port) tuple used to bind the socket locally.

  • family (int) – The address family. Should be any of AF_UNSPEC, AF_INET or AF_INET6.

Raises:

OSError – unrelated OS error occurred.

Returns:

A datagram socket.

Return type:

AsyncDatagramTransport

async AsyncBackend.create_unix_datagram_endpoint(path: str | bytes, *, local_path: str | bytes | None = None) AsyncDatagramTransport

Opens an endpoint to a Unix datagram socket.

Added in version 1.1.

Parameters:
  • path (str | bytes) – Path of the socket to which the connection is made.

  • local_path (str | bytes | None) – If given, is a Unix socket address used to bind the socket locally.

Raises:

OSError – unrelated OS error occurred.

Returns:

A datagram socket.

Return type:

AsyncDatagramTransport

abstractmethod async AsyncBackend.wrap_connected_datagram_socket(socket: socket.socket) AsyncDatagramTransport

Wraps an already connected SOCK_DGRAM socket into an asynchronous datagram socket.

Important

The returned stream socket takes the ownership of socket.

You should use AsyncDatagramTransport.aclose() to close the socket.

Parameters:

socket (socket.socket) – The socket to wrap.

Raises:

ValueError – Invalid socket type or family.

Returns:

A datagram socket.

Return type:

AsyncDatagramTransport

Creating Network Servers

abstractmethod async AsyncBackend.create_tcp_listeners(host: str | Sequence[str] | None, port: int, backlog: int, *, reuse_port: bool = ...) Sequence[AsyncListener[AsyncStreamTransport]]

Opens listener sockets for TCP connections.

Parameters:
  • host (str | Sequence[str] | None) –

    Can be set to several types which determine where the server would be listening:

    • If host is a string, the TCP server is bound to a single network interface specified by host.

    • If host is a sequence of strings, the TCP server is bound to all network interfaces specified by the sequence.

    • If host is None, all interfaces are assumed and a list of multiple sockets will be returned (most likely one for IPv4 and another one for IPv6).

  • port (int) – specify which port the server should listen on. If the value is 0, a random unused port will be selected (note that if host resolves to multiple network interfaces, a different random port will be selected for each interface).

  • backlog (int) – is the maximum number of queued connections passed to listen.

  • reuse_port (bool) – tells the kernel to allow this endpoint to be bound to the same port as other existing endpoints are bound to, so long as they all set this flag when being created. This option is not supported on Windows.

Raises:

OSError – unrelated OS error occurred.

Returns:

A sequence of listener sockets.

Return type:

Sequence[AsyncListener[AsyncStreamTransport]]

async AsyncBackend.create_unix_stream_listener(path: str | bytes, backlog: int, *, mode: int | None = None) AsyncListener[AsyncStreamTransport]

Opens a listener socket for Unix stream connections.

Added in version 1.1.

Parameters:
  • path (str | bytes) – Path of the socket.

  • backlog (int) – is the maximum number of queued connections passed to listen.

  • mode (int | None) – Permissions to set on the socket.

Raises:

OSError – unrelated OS error occurred.

Returns:

A listener socket.

Return type:

AsyncListener[AsyncStreamTransport]

abstractmethod async AsyncBackend.create_udp_listeners(host: str | Sequence[str] | None, port: int, *, reuse_port: bool = ...) Sequence[AsyncDatagramListener[tuple[Any, ...]]]

Opens UDP endpoints.

Parameters:
  • host (str | Sequence[str] | None) –

    Can be set to several types which determine where the server would be listening:

    • If host is a string, the UDP server is bound to a single network interface specified by host.

    • If host is a sequence of strings, the UDP server is bound to all network interfaces specified by the sequence.

    • If host is None, all interfaces are assumed and a list of multiple sockets will be returned (most likely one for IPv4 and another one for IPv6).

  • port (int) – specify which port the server should listen on. If the value is 0, a random unused port will be selected (note that if host resolves to multiple network interfaces, a different random port will be selected for each interface).

  • reuse_port (bool) – If True, sets the SO_REUSEPORT socket option if supported.

Raises:

OSError – unrelated OS error occurred.

Returns:

A sequence of datagram listener sockets.

Return type:

Sequence[AsyncDatagramListener[tuple[Any, …]]]

async AsyncBackend.create_unix_datagram_listener(path: str | bytes, *, mode: int | None = None) AsyncDatagramListener[str | bytes]

Opens a Unix datagram endpoint.

Added in version 1.1.

Parameters:
  • path (str | bytes) – Path of the socket.

  • mode (int | None) – Permissions to set on the socket.

Raises:

OSError – unrelated OS error occurred.

Returns:

A datagram listener socket.

Return type:

AsyncDatagramListener[str | bytes]

Synchronization Primitives

Locks

abstractmethod AsyncBackend.create_lock() ILock

Creates a Lock object for inter-task synchronization.

Returns:

A new Lock.

Return type:

ILock

AsyncBackend.create_fair_lock() ILock

Creates a Lock object for inter-task synchronization where tasks are guaranteed to acquire the lock in strict first-come-first-served order.

This means that it always goes to the task which has been waiting longest.

Added in version 1.1.

Returns:

A new fair Lock.

Return type:

ILock

protocol easynetwork.lowlevel.api_async.backend.abc.ILock

Bases: Protocol

A mutex lock for asynchronous tasks. Not thread-safe.

A lock can be used to guarantee exclusive access to a shared resource.

The preferred way to use a Lock is an async with statement:

lock = backend.create_lock()

# ... later
async with lock:
    # access shared state

which is equivalent to:

lock = backend.create_lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

Classes that implement this protocol must have the following methods / attributes:

abstractmethod async __aenter__() Any
Return type:

Any

abstractmethod async __aexit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None, /) Literal[False] | None
Return type:

Literal[False] | None

abstractmethod async acquire() Any

Acquires the lock.

This method waits until the lock is unlocked, sets it to locked.

When more than one coroutine is blocked in acquire() waiting for the lock to be unlocked, only one coroutine eventually proceeds.

Return type:

Any

abstractmethod release() None

Releases the lock.

When the lock is locked, reset it to unlocked and return.

Raises:

RuntimeError – the lock is unlocked or the task does not have the lock ownership.

abstractmethod locked() bool

Returns True if the lock is locked.

Returns:

the lock state.

Return type:

bool

Events

abstractmethod AsyncBackend.create_event() IEvent

Creates an Event object for inter-task synchronization.

Returns:

A new Event.

Return type:

IEvent

protocol easynetwork.lowlevel.api_async.backend.abc.IEvent

Bases: Protocol

A waitable boolean value useful for inter-task synchronization. Not thread-safe.

An event object has an internal boolean flag, representing whether the event has happened yet. The flag is initially False, and the wait() method waits until the flag is True. If the flag is already True, then wait() returns immediately. (If the event has already happened, there’s nothing to wait for.) The set() method sets the flag to True, and wakes up any waiters.

This behavior is useful because it helps avoid race conditions and lost wakeups: it doesn’t matter whether set() gets called just before or after wait().

Classes that implement this protocol must have the following methods / attributes:

abstractmethod async wait() Any

Blocks until the internal flag value becomes True.

If it is already True, then this method returns immediately.

Return type:

Any

abstractmethod set() None

Sets the internal flag value to True, and wake any waiting tasks.

abstractmethod is_set() bool
Returns:

the current value of the internal flag.

Return type:

bool

Condition Variables

abstractmethod AsyncBackend.create_condition_var(lock: ILock | None = ...) ICondition

Creates a Condition variable object for inter-task synchronization.

If lock is given and not None, it should be a lock created by create_lock(). While it is guaranteed to work with a lock from create_lock(), it can be any other implementation (such as the lock returned by create_fair_lock()), but it can also refuse other implementations.

Generic code should expect the function to fail:

try:
    cond = backend.create_condition_var(backend.create_fair_lock())
except TypeError:
    # Cannot use a fair lock. Use the default implementation instead.
    cond = backend.create_condition_var()
Parameters:

lock (ILock | None) – The lock instance to use under the hood.

Raises:

TypeErrorlock type is not supported.

Returns:

A new Condition.

Return type:

ICondition

protocol easynetwork.lowlevel.api_async.backend.abc.ICondition

Bases: ILock, Protocol

A classic condition variable, similar to threading.Condition.

Classes that implement this protocol must have the following methods / attributes:

abstractmethod notify(n: int = ..., /) None

Wake one or more tasks that are blocked in wait().

abstractmethod notify_all() None

Wake all tasks that are blocked in wait().

abstractmethod async wait() Any

Wait until notified.

Raises:

RuntimeError – The underlying lock is not held by this task.

Return type:

Any

Concurrency And Multithreading

Running Blocking Code

abstractmethod async AsyncBackend.run_in_thread(func: Callable[[Unpack[_T_PosArgs]], _T], /, *args: Unpack[_T_PosArgs], abandon_on_cancel: bool = ...) _T

Executes a synchronous function in a worker thread.

This is useful to execute a long-running (or temporarily blocking) function and let other tasks run.

From inside the worker thread, you can get back into the scheduler loop using a ThreadsPortal. See create_threads_portal() for details.

Cancellation handling:

Because there is no way to “cancel” an arbitrary function call in an OS thread, once the job is started:

  • If abandon_on_cancel is False (the default), any cancellation requests will be discarded.

  • If abandon_on_cancel is True, the task will notify the thread to stop (if possible) then will bail out.

Warning

Due to the current coroutine implementation, func should not raise a StopIteration. This can lead to unexpected (and unwanted) behavior.

Parameters:
  • func (Callable[[Unpack[_T_PosArgs]], _T]) – A synchronous function.

  • args (Unpack[_T_PosArgs]) – Positional arguments to be passed to func. If you need to pass keyword arguments, then use functools.partial().

  • abandon_on_cancel (bool) – Whether or not to abort task on cancellation request.

Raises:

Exception – Whatever func(*args) raises.

Returns:

Whatever func(*args) returns.

Return type:

_T

Scheduling From Other Threads

abstractmethod AsyncBackend.create_threads_portal() ThreadsPortal

Creates a portal for executing functions in the event loop thread for use in external threads.

Use this function in asynchronous code when you need to allow external threads access to the event loop where your asynchronous code is currently running.

Raises:

RuntimeError – not called in the event loop thread.

Returns:

a new thread portal.

Return type:

ThreadsPortal

class easynetwork.lowlevel.api_async.backend.abc.ThreadsPortal

Bases: object

An object that lets external threads run code in an asynchronous event loop.

You must use it as a context manager within the event loop to start the portal:

async with threads_portal:
    ...

If the portal is not entered or exited, then all of the operations would throw a RuntimeError for the threads.

abstractmethod run_coroutine_soon(coro_func: ~collections.abc.Callable[[~_P], ~collections.abc.Awaitable[~easynetwork.lowlevel.api_async.backend.abc._T]], /, *args: ~typing.~_P, **kwargs: ~typing.~_P) Future[_T]

Run the given async function in the bound event loop thread. Thread-safe.

Parameters:
  • coro_func (Callable[[~_P], Awaitable[_T]]) – An async function.

  • args (~_P) – Positional arguments to be passed to coro_func.

  • kwargs (~_P) – Keyword arguments to be passed to coro_func.

Raises:
  • RuntimeError – if the portal is shut down.

  • RuntimeError – if you try calling this from inside the event loop thread, to avoid potential deadlocks.

Returns:

A future filled with the result of await coro_func(*args, **kwargs).

Return type:

Future[_T]

run_coroutine(coro_func: ~collections.abc.Callable[[~_P], ~collections.abc.Awaitable[~easynetwork.lowlevel.api_async.backend.abc._T]], /, *args: ~typing.~_P, **kwargs: ~typing.~_P) _T

Run the given async function in the bound event loop thread, blocking until it is complete. Thread-safe.

The default implementation is equivalent to:

portal.run_coroutine_soon(coro_func, *args, **kwargs).result()
Parameters:
  • coro_func (Callable[[~_P], Awaitable[_T]]) – An async function.

  • args (~_P) – Positional arguments to be passed to coro_func.

  • kwargs (~_P) – Keyword arguments to be passed to coro_func.

Raises:
  • concurrent.futures.CancelledError – The portal has been shut down while coro_func() was running and cancelled the task.

  • RuntimeError – if the portal is shut down.

  • RuntimeError – if you try calling this from inside the event loop thread, which would otherwise cause a deadlock.

  • Exception – Whatever raises await coro_func(*args, **kwargs).

Returns:

Whatever returns await coro_func(*args, **kwargs).

Return type:

_T

abstractmethod run_sync_soon(func: ~collections.abc.Callable[[~_P], ~easynetwork.lowlevel.api_async.backend.abc._T], /, *args: ~typing.~_P, **kwargs: ~typing.~_P) Future[_T]

Executes a function in the event loop thread from a worker thread. Thread-safe.

Parameters:
  • func (Callable[[~_P], _T]) – A synchronous function.

  • args (~_P) – Positional arguments to be passed to func.

  • kwargs (~_P) – Keyword arguments to be passed to func.

Raises:
  • RuntimeError – if the portal is shut down.

  • RuntimeError – if you try calling this from inside the event loop thread, to avoid potential deadlocks.

Returns:

A future filled with the result of func(*args, **kwargs).

Return type:

Future[_T]

run_sync(func: ~collections.abc.Callable[[~_P], ~easynetwork.lowlevel.api_async.backend.abc._T], /, *args: ~typing.~_P, **kwargs: ~typing.~_P) _T

Executes a function in the event loop thread from a worker thread. Thread-safe.

The default implementation is equivalent to:

portal.run_sync_soon(func, *args, **kwargs).result()
Parameters:
  • func (Callable[[~_P], _T]) – A synchronous function.

  • args (~_P) – Positional arguments to be passed to func.

  • kwargs (~_P) – Keyword arguments to be passed to func.

Raises:
  • RuntimeError – if the portal is shut down.

  • RuntimeError – if you try calling this from inside the event loop thread, which would otherwise cause a deadlock.

  • Exception – Whatever raises func(*args, **kwargs).

Returns:

Whatever returns func(*args, **kwargs).

Return type:

_T

Useful tools

easynetwork.lowlevel.api_async.backend.utils.BuiltinAsyncBackendLiteral

Supported asynchronous framework names.

Alias of Literal['asyncio', 'trio']

easynetwork.lowlevel.api_async.backend.utils.ensure_backend(backend: AsyncBackend | Literal['asyncio', 'trio'] | None) AsyncBackend

Obtain an interface for the given backend.

  • If backend is already an AsyncBackend, this object is returned.

  • If backend is a string token and matches one of the built-in implementation, a new object is returned.

  • If None, the function tries to guess the library currently used with sniffio.

Raises:
Return type:

AsyncBackend

easynetwork.lowlevel.api_async.backend.utils.new_builtin_backend(name: Literal['asyncio', 'trio']) AsyncBackend

Obtain an interface for the given backend.

Here is the list of the supported libraries:

  • "asyncio"

  • "trio"

Return type:

AsyncBackend