How can I wrap a synchronous function in an async coroutine?

I'm using aiohttp to build an API server that sends TCP requests off to a seperate server. The module that sends the TCP requests is synchronous and a black box for my purposes. So my problem is that these requests are blocking the entire API. I need a way to wrap the module requests in an asynchronous coroutine that won't block the rest of the API.

So, just using sleep as a simple example, is there any way to somehow wrap time-consuming synchronous code in a non-blocking coroutine, something like this:

async def sleep_async(delay):
# After calling sleep, loop should be released until sleep is done
yield sleep(delay)
return 'I slept asynchronously'
50996 次浏览

Eventually I found an answer in this thread. The method I was looking for is run_in_executor. This allows a synchronous function to be run asynchronously without blocking an event loop.

In the sleep example I posted above, it might look like this:

import asyncio
from time import sleep


async def sleep_async(loop, delay):
# None uses the default executor (ThreadPoolExecutor)
await loop.run_in_executor(None, sleep, delay)
return 'I slept asynchronously'

Also see the following answer -> How do we call a normal function where a coroutine is expected?

You can use a decorator to wrap the sync version to an async version.

import time
from functools import wraps, partial




def wrap(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run


@wrap
def sleep_async(delay):
time.sleep(delay)
return 'I slept asynchronously'

Outdated, aioify is maintenance mode

or use the aioify library

% pip install aioify

then

@aioify
def sleep_async(delay):
pass

Not sure if too late but you can also use a decorator to do your function in a thread. ALTHOUGH, note that it will still be non-coop blocking unlike async which is co-op blocking.

def wrap(func):
from concurrent.futures import ThreadPoolExecutor
pool=ThreadPoolExecutor()
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
future=pool.submit(func, *args, **kwargs)
return asyncio.wrap_future(future)
return run

Maybe someone will need my solution to this problem. I wrote my own library to solve this, which allows you to make any function asynchronous using a decorator.

To install the library, run this command:

$ pip install awaits

To make any of your functions asynchronous, just add the @awaitable decorator to it, like this:

import time
import asyncio
from awaits.awaitable import awaitable


@awaitable
def sum(a, b):
# heavy load simulation
time.sleep(10)
return a + b

Now you can make sure that your function is really asynchronous coroutine:

print(asyncio.run(sum(2, 2)))

"Under the hood" your function will be executed in the thread pool. This thread pool will not be recreated every time your function is called. A thread pool is created once and accepts new tasks via a queue. This will make your program run faster than using other solutions, because the creation of additional threads is an additional overhead.

The decorator would be useful for this case and run your blocking function in another thread.

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union


class to_async:


def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
       

self.executor =  executor
    

def __call__(self, blocking):
@wraps(blocking)
async def wrapper(*args, **kwargs):


loop = asyncio.get_event_loop()
if not self.executor:
self.executor = ThreadPoolExecutor()


func = partial(blocking, *args, **kwargs)
        

return await loop.run_in_executor(self.executor,func)


return wrapper


@to_async(executor=None)
def sync(*args, **kwargs):
print(args, kwargs)
   

asyncio.run(sync("hello", "world", result=True))


From python 3.9 the cleanest way to do this is to use asyncio.to_thread method, which is basically a shortcut for run_in_executor, but keeps all the contextvars.

Also, please consider GIL, since it is a to_thread. You still can run CPU-bound tasks for something like numpy. From the docs:

Note Due to the GIL, asyncio.to_thread() can typically only be used to make IO-bound functions non-blocking. However, for extension modules that release the GIL or alternative Python implementations that don’t have one, asyncio.to_thread() can also be used for CPU-bound functions.