How-to — Multithreading Integration In Servers
Note
This page uses two different API variants:
Synchronous API with classic
deffunctions, usable in any context.Asynchronous API with
async deffunctions, using an asynchronous framework to perform I/O operations.
All asynchronous API examples assume that you are using either asyncio or trio,
but you can use a different library thanks to the asynchronous backend engine API.
Run Blocking Functions In A Worker Thread
You can run IO-bound functions in another OS thread and await the result:
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 response = await asyncio.to_thread(self._data_processing, request)
8
9 await client.send_packet(response)
10
11def _data_processing(self, request: Request) -> Response:
12 # Simulate long computing
13 time.sleep(1)
14
15 return Response()
See also
The asyncio.to_thread() coroutine.
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 response = await trio.to_thread.run_sync(self._data_processing, request)
8
9 await client.send_packet(response)
10
11def _data_processing(self, request: Request) -> Response:
12 # Simulate long computing
13 time.sleep(1)
14
15 return Response()
See also
The trio.to_thread.run_sync() coroutine.
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 response = await client.backend().run_in_thread(self._data_processing, request)
8
9 await client.send_packet(response)
10
11def _data_processing(self, request: Request) -> Response:
12 # Simulate long computing
13 time.sleep(1)
14
15 return Response()
See also
The AsyncBackend.run_in_thread() coroutine.
Use A Custom Thread Pool
Instead of using the scheduler’s global thread pool, you can (and should) have your own thread pool:
1async def service_init(self, exit_stack: AsyncExitStack, server: AsyncTCPNetworkServer[Request, Response]) -> None:
2 from concurrent.futures import ThreadPoolExecutor
3
4 from easynetwork.lowlevel.futures import AsyncExecutor
5
6 # 4 worker threads for the demo
7 self.executor = AsyncExecutor(ThreadPoolExecutor(max_workers=4), server.backend())
8 # Shut down executor at server stop
9 await exit_stack.enter_async_context(self.executor)
10
11async def handle(
12 self,
13 client: AsyncStreamClient[Response],
14) -> AsyncGenerator[None, Request]:
15 request: Request = yield
16
17 response = await self.executor.run(self._data_processing, request)
18
19 await client.send_packet(response)
20
21def _data_processing(self, request: Request) -> Response:
22 # Simulate long computing
23 time.sleep(1)
24
25 return Response()
See also
The AsyncExecutor class.
Allow Access To The Scheduler Loop From Within A Thread
There are many ways provided by your asynchronous framework to get back from a thread to the scheduler loop.
However, the simplest way is to use the provided ThreadsPortal interface:
1async def service_init(self, exit_stack: AsyncExitStack, server: AsyncTCPNetworkServer[Request, Response]) -> None:
2 from concurrent.futures import ThreadPoolExecutor
3
4 from easynetwork.lowlevel.futures import AsyncExecutor
5
6 # 4 worker threads for the demo
7 self.executor = AsyncExecutor(ThreadPoolExecutor(max_workers=4), server.backend())
8 await exit_stack.enter_async_context(self.executor)
9
10 # Create a portal to execute code from external threads in the scheduler loop
11 self.portal = server.backend().create_threads_portal()
12 await exit_stack.enter_async_context(self.portal)
Calling asynchronous code from a worker thread
If you need to call a coroutine function from a worker thread, you can do this:
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 response = await self.executor.run(self._data_processing, request)
8
9 await client.send_packet(response)
10
11def _data_processing(self, request: Request) -> Response:
12 # Get back in scheduler loop for 1 second
13 backend = self.executor.backend()
14 self.portal.run_coroutine(backend.sleep, 1)
15
16 return Response()
Calling synchronous code from a worker thread
Occasionally you may need to call synchronous code in the event loop thread from a worker thread.
Common cases include setting asynchronous events or sending data to a stream. Because these methods aren’t thread safe,
you need to arrange them to be called inside the event loop thread using run_sync():
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 event = client.backend().create_event()
8
9 self.executor.wrapped.submit(self._blocking_wait, event)
10 await event.wait()
11
12 await client.send_packet(Response())
13
14def _blocking_wait(self, event: IEvent) -> None:
15 time.sleep(1)
16
17 # Thread-safe flag set
18 self.portal.run_sync(event.set)
Spawning tasks from worker threads
When you need to spawn a task to be run in the background, you can do so using run_coroutine_soon():
1async def handle(
2 self,
3 client: AsyncStreamClient[Response],
4) -> AsyncGenerator[None, Request]:
5 request: Request = yield
6
7 await self.executor.run(self._blocking_wait)
8
9 await client.send_packet(Response())
10
11def _blocking_wait(self) -> None:
12 sleep = self.executor.backend().sleep
13
14 async def long_running_task(index: int) -> str:
15 await sleep(1)
16 print(f"Task {index} running...")
17 await sleep(index)
18 return f"Task {index} return value"
19
20 # Spawn several tasks
21 from concurrent.futures import as_completed
22
23 futures = [self.portal.run_coroutine_soon(long_running_task, i) for i in range(1, 5)]
24 for future in as_completed(futures):
25 print(future.result())
Cancelling tasks spawned this way can be done by cancelling the returned Future.