How-to — Multithreading Integration In Servers

Note

This page uses two different API variants:

  • Synchronous API with classic def functions, usable in any context.

  • Asynchronous API with async def functions, using an asynchronous framework to perform I/O operations.

All asynchronous API examples assume that you are using either asyncio or trio, but you can use a different library thanks to the asynchronous backend engine API.


Run Blocking Functions In A Worker Thread

You can run IO-bound functions in another OS thread and await the result:

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    response = await asyncio.to_thread(self._data_processing, request)
 8
 9    await client.send_packet(response)
10
11def _data_processing(self, request: Request) -> Response:
12    # Simulate long computing
13    time.sleep(1)
14
15    return Response()

See also

The asyncio.to_thread() coroutine.

Use A Custom Thread Pool

Instead of using the scheduler’s global thread pool, you can (and should) have your own thread pool:

 1async def service_init(self, exit_stack: AsyncExitStack, server: AsyncTCPNetworkServer[Request, Response]) -> None:
 2    from concurrent.futures import ThreadPoolExecutor
 3
 4    from easynetwork.lowlevel.futures import AsyncExecutor
 5
 6    # 4 worker threads for the demo
 7    self.executor = AsyncExecutor(ThreadPoolExecutor(max_workers=4), server.backend())
 8    # Shut down executor at server stop
 9    await exit_stack.enter_async_context(self.executor)
10
11async def handle(
12    self,
13    client: AsyncStreamClient[Response],
14) -> AsyncGenerator[None, Request]:
15    request: Request = yield
16
17    response = await self.executor.run(self._data_processing, request)
18
19    await client.send_packet(response)
20
21def _data_processing(self, request: Request) -> Response:
22    # Simulate long computing
23    time.sleep(1)
24
25    return Response()

See also

The AsyncExecutor class.

Allow Access To The Scheduler Loop From Within A Thread

There are many ways provided by your asynchronous framework to get back from a thread to the scheduler loop. However, the simplest way is to use the provided ThreadsPortal interface:

 1async def service_init(self, exit_stack: AsyncExitStack, server: AsyncTCPNetworkServer[Request, Response]) -> None:
 2    from concurrent.futures import ThreadPoolExecutor
 3
 4    from easynetwork.lowlevel.futures import AsyncExecutor
 5
 6    # 4 worker threads for the demo
 7    self.executor = AsyncExecutor(ThreadPoolExecutor(max_workers=4), server.backend())
 8    await exit_stack.enter_async_context(self.executor)
 9
10    # Create a portal to execute code from external threads in the scheduler loop
11    self.portal = server.backend().create_threads_portal()
12    await exit_stack.enter_async_context(self.portal)

Calling asynchronous code from a worker thread

If you need to call a coroutine function from a worker thread, you can do this:

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    response = await self.executor.run(self._data_processing, request)
 8
 9    await client.send_packet(response)
10
11def _data_processing(self, request: Request) -> Response:
12    # Get back in scheduler loop for 1 second
13    backend = self.executor.backend()
14    self.portal.run_coroutine(backend.sleep, 1)
15
16    return Response()

Calling synchronous code from a worker thread

Occasionally you may need to call synchronous code in the event loop thread from a worker thread. Common cases include setting asynchronous events or sending data to a stream. Because these methods aren’t thread safe, you need to arrange them to be called inside the event loop thread using run_sync():

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    event = client.backend().create_event()
 8
 9    self.executor.wrapped.submit(self._blocking_wait, event)
10    await event.wait()
11
12    await client.send_packet(Response())
13
14def _blocking_wait(self, event: IEvent) -> None:
15    time.sleep(1)
16
17    # Thread-safe flag set
18    self.portal.run_sync(event.set)

Spawning tasks from worker threads

When you need to spawn a task to be run in the background, you can do so using run_coroutine_soon():

 1async def handle(
 2    self,
 3    client: AsyncStreamClient[Response],
 4) -> AsyncGenerator[None, Request]:
 5    request: Request = yield
 6
 7    await self.executor.run(self._blocking_wait)
 8
 9    await client.send_packet(Response())
10
11def _blocking_wait(self) -> None:
12    sleep = self.executor.backend().sleep
13
14    async def long_running_task(index: int) -> str:
15        await sleep(1)
16        print(f"Task {index} running...")
17        await sleep(index)
18        return f"Task {index} return value"
19
20    # Spawn several tasks
21    from concurrent.futures import as_completed
22
23    futures = [self.portal.run_coroutine_soon(long_running_task, i) for i in range(1, 5)]
24    for future in as_completed(futures):
25        print(future.result())

Cancelling tasks spawned this way can be done by cancelling the returned Future.