Concurrency And Multithreading (concurrent.futures Integration)

Asynchronous backend engine bindings with concurrent.futures module.

class easynetwork.lowlevel.futures.AsyncExecutor

Bases: Generic[_T_Executor]

Wraps a concurrent.futures.Executor instance.

For example, this code:

from concurrent.futures import ProcessPoolExecutor, wait

def main() -> None:
    with ProcessPoolExecutor() as executor:
        futures = [executor.submit(pow, a, b) for a, b in [(3, 4), (12, 2), (6, 8)]]
        wait(futures)
        results = [f.result() for f in futures]

can be converted to:

from concurrent.futures import ProcessPoolExecutor

async def main() -> None:
    ...

    async with AsyncExecutor(ProcessPoolExecutor(), handle_contexts=False) as executor:
        async with backend.create_task_group() as task_group:
            tasks = [await task_group.start(executor.run, pow, a, b) for a, b in [(3, 4), (12, 2), (6, 8)]]
        results = [await t.join() for t in tasks]
__init__(executor: _T_Executor, backend: AsyncBackend | Literal['asyncio', 'trio'] | None = None, *, handle_contexts: bool = True) None
Parameters:
async __aexit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None

Calls shutdown().

async run(func: ~collections.abc.Callable[[~_P], ~easynetwork.lowlevel.futures._T], /, *args: ~typing.~_P, **kwargs: ~typing.~_P) _T

Executes func(*args, **kwargs) in the executor, blocking until it is complete.

Example:

async with AsyncExecutor(ThreadPoolExecutor(max_workers=1)) as executor:
    result = await executor.run(pow, 323, 1235)

Warning

Due to the current coroutine implementation, func should not raise a StopIteration. This can lead to unexpected (and unwanted) behavior.

Parameters:
  • func (Callable[[~_P], _T]) – A synchronous function.

  • args (~_P) – Positional arguments to be passed to func.

  • kwargs (~_P) – Keyword arguments to be passed to func.

Raises:
Returns:

Whatever returns func(*args, **kwargs).

Return type:

_T

map(func: Callable[[...], _T], *iterables: Iterable[Any]) AsyncGenerator[_T]

Returns an asynchronous iterator equivalent to map(fn, iter).

Example:

def pow_50(x):
    return x**50

async with AsyncExecutor(ProcessPoolExecutor(), handle_contexts=False) as executor:
    results = [result async for result in executor.map(pow_50, (1, 4, 12))]
Parameters:
  • func (Callable[[...], _T]) – A callable that will take as many arguments as there are passed iterables.

  • iterables (Iterable[Any]) – iterables yielding arguments for func.

Raises:

Exception – If fn(*args) raises for any values.

Returns:

An asynchronous iterator equivalent to map(func, *iterables) but the calls may be evaluated out-of-order.

Return type:

AsyncGenerator[_T]

shutdown_nowait(*, cancel_futures: bool = False) None

Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.

Calls to AsyncExecutor.run() made after shutdown will raise RuntimeError.

Parameters:

cancel_futures (bool) – If True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures.

async shutdown(*, cancel_futures: bool = False) None

Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.

Calls to AsyncExecutor.run() made after shutdown will raise RuntimeError.

This method will block until all the pending futures are done executing and the resources associated with the executor have been freed.

Parameters:

cancel_futures (bool) – If True, this method will cancel all pending futures that the executor has not started running. Any futures that are completed or running won’t be cancelled, regardless of the value of cancel_futures.

backend() AsyncBackend
Returns:

The backend implementation linked to this object.

Return type:

AsyncBackend

property wrapped: _T_Executor

The wrapped Executor instance. Read-only attribute.

async easynetwork.lowlevel.futures.unwrap_future(future: Future[_T], backend: AsyncBackend) _T

Blocks until the future is done, and returns the result.

Cancellation handling:

Parameters:
Raises:
Returns:

Whatever returns future.result()

Return type:

_T