Concurrency And Multithreading (concurrent.futures Integration)
Asynchronous backend engine bindings with concurrent.futures module.
- class easynetwork.lowlevel.futures.AsyncExecutor
Bases:
Generic[_T_Executor]Wraps a
concurrent.futures.Executorinstance.For example, this code:
from concurrent.futures import ProcessPoolExecutor, wait def main() -> None: with ProcessPoolExecutor() as executor: futures = [executor.submit(pow, a, b) for a, b in [(3, 4), (12, 2), (6, 8)]] wait(futures) results = [f.result() for f in futures]
can be converted to:
from concurrent.futures import ProcessPoolExecutor async def main() -> None: ... async with AsyncExecutor(ProcessPoolExecutor(), handle_contexts=False) as executor: async with backend.create_task_group() as task_group: tasks = [await task_group.start(executor.run, pow, a, b) for a, b in [(3, 4), (12, 2), (6, 8)]] results = [await t.join() for t in tasks]
- __init__(executor: _T_Executor, backend: AsyncBackend | Literal['asyncio', 'trio'] | None = None, *, handle_contexts: bool = True) None
- Parameters:
executor (_T_Executor) – The executor instance to wrap.
backend (AsyncBackend | Literal['asyncio', 'trio'] | None) – The asynchronous backend interface to use.
handle_contexts (bool) – If
True(the default), contexts (contextvars.Context) are properly propagated to workers. Set it toFalseif the executor does not support the use of contexts (e.g.concurrent.futures.ProcessPoolExecutor).
- async __aexit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None) None
Calls
shutdown().
- async run(func: ~collections.abc.Callable[[~_P], ~easynetwork.lowlevel.futures._T], /, *args: ~typing.~_P, **kwargs: ~typing.~_P) _T
Executes
func(*args, **kwargs)in the executor, blocking until it is complete.Example:
async with AsyncExecutor(ThreadPoolExecutor(max_workers=1)) as executor: result = await executor.run(pow, 323, 1235)
Warning
Due to the current coroutine implementation, func should not raise a
StopIteration. This can lead to unexpected (and unwanted) behavior.- Parameters:
func (Callable[[~_P], _T]) – A synchronous function.
args (~_P) – Positional arguments to be passed to func.
kwargs (~_P) – Keyword arguments to be passed to func.
- Raises:
RuntimeError – if the executor is closed.
concurrent.futures.CancelledError – if the executor is shutting down and pending task has been cancelled.
Exception – Whatever raises
func(*args, **kwargs).
- Returns:
Whatever returns
func(*args, **kwargs).- Return type:
_T
- map(func: Callable[[...], _T], *iterables: Iterable[Any]) AsyncGenerator[_T]
Returns an asynchronous iterator equivalent to
map(fn, iter).Example:
def pow_50(x): return x**50 async with AsyncExecutor(ProcessPoolExecutor(), handle_contexts=False) as executor: results = [result async for result in executor.map(pow_50, (1, 4, 12))]
- Parameters:
- Raises:
Exception – If
fn(*args)raises for any values.- Returns:
An asynchronous iterator equivalent to
map(func, *iterables)but the calls may be evaluated out-of-order.- Return type:
AsyncGenerator[_T]
- shutdown_nowait(*, cancel_futures: bool = False) None
Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.
Calls to
AsyncExecutor.run()made after shutdown will raiseRuntimeError.
- async shutdown(*, cancel_futures: bool = False) None
Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.
Calls to
AsyncExecutor.run()made after shutdown will raiseRuntimeError.This method will block until all the pending futures are done executing and the resources associated with the executor have been freed.
- backend() AsyncBackend
- Returns:
The backend implementation linked to this object.
- Return type:
- async easynetwork.lowlevel.futures.unwrap_future(future: Future[_T], backend: AsyncBackend) _T
Blocks until the future is done, and returns the result.
Cancellation handling:
unwrap_future()tries to cancel the given future (usingconcurrent.futures.Future.cancel())If the future has been effectively cancelled, the cancellation request is “accepted” and propagated.
Otherwise, the cancellation request is “rejected”:
unwrap_future()will block until future is done, and will ignore any further cancellation request.
A coroutine awaiting a future in
runningstate (concurrent.futures.Future.running()returnsTrue) cannot be cancelled.
- Parameters:
future (Future[_T]) – The future object to wait for.
backend (AsyncBackend) – The asynchronous backend interface to use.
- Raises:
concurrent.futures.CancelledError – the future has been unexpectedly cancelled by an external code (typically
concurrent.futures.Executor.shutdown()).Exception – If
future.exception()does not returnNone, this exception is raised.
- Returns:
Whatever returns
future.result()- Return type:
_T