Run Asyncio Event Loop in another thread
Mar 14, 2020
2 minute read

When using a web framework not supports to write concurrent code using the async/await syntax, want to use concurrent to speed the connection up with another service, like connecting Redis, making a lot of requests, etc. So it’s why we need to run the asyncio event loop in another thread.

Create a thread that runs asyncio event loop forever

Attention, This example is not fully tested.

import asyncio
import threading

class AsyncioEventLoopThread(threading.Thread):
    def __init__(self, *args, loop=None, **kwargs):
        super().__init__(*args, **kwargs)
        self.loop = loop or asyncio.new_event_loop()
        self.running = False

    def run(self):
        self.running = True
        self.loop.run_forever()

    def run_coro(self, coro):
        return asyncio.run_coroutine_threadsafe(coro, loop=self.loop).result()

    def stop(self):
        self.loop.call_soon_threadsafe(self.loop.stop)
        self.join()
        self.running = False

Due to the asyncio.run_coroutine_threadsafe function returns a concurrent.futures.Future object, so we just execute the result method to get the coroutine result. It will wait for the coroutine finished.

There is a simple example below to demonstrate how to use it.

async def hello_world():
    print("hello world")

async def make_request():
    await asyncio.sleep(1)

thr = AsyncioEventLoopThread()
thr.start()
try:
    thr.run_coro(hello_world())
    thr.run_coro(make_request())
finally:
    thr.stop()

Attention, don’t run the same coroutine in two different event loops.

Share objects between coroutines

you should inherit the AsyncioEventLoopThread or just modify it to hold the objects you need to share between coroutines.

Using the contextvars to share objects between coroutines, shared values must be stored into contextvars.Context before running coroutines.

import contextvars
import aiohttp

var_session = contextvars.ContextVar('session')

class FetcherThread(AsyncioEventLoopThread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._session = None
        self._event_session_created = threading.Event()

    def run_coro(self, coro):
        self._event_session_created.wait()
        var_session.set(self.session)
        return super().run_coro(coro)

    async def _create_session(self):
        self.session = aiohttp.ClientSession()

    async def _close_session(self):
        await self.session.close()

    def run(self):
        fut = asyncio.run_coroutine_threadsafe(self._create_session(), loop=self.loop)
        fut.add_done_callback(lambda _: self._event_session_created.set())
        super().run()

    def stop(self):
        self.run_coro(self._close_session())
        super().stop()

There is a simple example below to fetch the source code of website github.com.

async def make_request():
    session = var_session.get()
    async with session.get("https://github.com") as resp:
        resp.raise_for_status()
        return await resp.text()


thr = FetcherThread()
thr.start()
try:
    text = thr.run_coro(make_request())
    print(text)
finally:
    thr.stop()

References




comments powered by Disqus