如何使用 python 的异步模块正确地创建和运行并发任务?

我试图正确理解和实现两个并发运行的 Task对象,使用 Python 3相对较新的 asyncio模块。

简而言之,异步似乎被设计用于在事件循环中处理异步进程和并发 Task执行。它提倡使用 await(应用于异步函数)作为等待和使用结果的无回调方式,而不会阻塞事件循环。(期货和回购仍是一个可行的选择。)

它还提供了 asyncio.Task()类,这是 Future的一个专门的子类,用于包装协程。最好通过使用 asyncio.ensure_future()方法调用。异步任务的预期用途是允许独立运行的任务与同一事件循环中的其他任务“并发”运行。我的理解是,Tasks连接到事件循环,然后自动继续驱动 await语句之间的协程。

我喜欢能够使用并发任务而不需要使用其中一个 Executor类的想法,但是我还没有找到关于实现的详细说明。

我现在是这么做的:

import asyncio


print('running async test')


async def say_boo():
i = 0
while True:
await asyncio.sleep(0)
print('...boo {0}'.format(i))
i += 1


async def say_baa():
i = 0
while True:
await asyncio.sleep(0)
print('...baa {0}'.format(i))
i += 1


# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())


loop = asyncio.get_event_loop()
loop.run_forever()

在尝试同时运行两个循环任务的情况下,我注意到除非 Task 有一个内部的 await表达式,否则它会卡在 while循环中,有效地阻止其他任务的运行(很像一个正常的 while循环)。但是,当 Tasks 必须(a)等待时,它们似乎可以并发运行而不会出现问题。

因此,await语句似乎为事件循环提供了在任务之间来回切换的立足点,从而产生了并发效果。

内部 await的输出示例:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

示例输出 没有内部 await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

问题

这个实现是否通过了 asyncio中并发循环任务的“正确”示例?

Task提供一个阻塞点(await表达式)以便事件循环处理多个任务,这样的唯一方法是正确的吗?

剪辑

2022年更新: 请注意,自从这个问题被提出以来,asyncio API 已经发生了相当大的变化。请参阅新标记为正确答案的部分,该部分现在显示了 Python 3.10中 API 的正确使用。我仍然推荐@dano 的答案,以便更广泛地了解这是如何在引擎盖下工作的。

88334 次浏览

Yes, any coroutine that's running inside your event loop will block other coroutines and tasks from running, unless it

  1. Calls another coroutine using yield from or await (if using Python 3.5+).
  2. Returns.

This is because asyncio is single-threaded; the only way for the event loop to run is for no other coroutine to be actively executing. Using yield from/await suspends the coroutine temporarily, giving the event loop a chance to work.

Your example code is fine, but in many cases, you probably wouldn't want long-running code that isn't doing asynchronous I/O running inside the event loop to begin with. In those cases, it often makes more sense to use asyncio.loop.run_in_executor to run the code in a background thread or process. ProcessPoolExecutor would be the better choice if your task is CPU-bound, ThreadPoolExecutor would be used if you need to do some I/O that isn't asyncio-friendly.

Your two loops, for example, are completely CPU-bound and don't share any state, so the best performance would come from using ProcessPoolExecutor to run each loop in parallel across CPUs:

import asyncio
from concurrent.futures import ProcessPoolExecutor


print('running async test')


def say_boo():
i = 0
while True:
print('...boo {0}'.format(i))
i += 1




def say_baa():
i = 0
while True:
print('...baa {0}'.format(i))
i += 1


if __name__ == "__main__":
executor = ProcessPoolExecutor(2)
loop = asyncio.new_event_loop()
boo = loop.run_in_executor(executor, say_boo)
baa = loop.run_in_executor(executor, say_baa)


loop.run_forever()

You don't necessarily need a yield from x to give control over to the event loop.

In your example, I think the proper way would be to do a yield None or equivalently a simple yield, rather than a yield from asyncio.sleep(0.001):

import asyncio


@asyncio.coroutine
def say_boo():
i = 0
while True:
yield None
print("...boo {0}".format(i))
i += 1


@asyncio.coroutine
def say_baa():
i = 0
while True:
yield
print("...baa {0}".format(i))
i += 1


boo_task = asyncio.async(say_boo())
baa_task = asyncio.async(say_baa())


loop = asyncio.get_event_loop()
loop.run_forever()

Coroutines are just plain old Python generators. Internally, the asyncio event loop keeps a record of these generators and calls gen.send() on each of them one by one in a never ending loop. Whenever you yield, the call to gen.send() completes and the loop can move on. (I'm simplifying it; take a look around https://hg.python.org/cpython/file/3.4/Lib/asyncio/tasks.py#l265 for the actual code)

That said, I would still go the run_in_executor route if you need to do CPU intensive computation without sharing data.

The functions asyncio.ensure_future and asyncio.get_event_loop are deprecated in Python 3.10.

You can run the two coroutines say_boo and say_baa concurrently through asyncio.create_task:

async def main():
boo = asyncio.create_task(say_boo())
baa = asyncio.create_task(say_baa())
await boo
await baa


asyncio.run(main())

You can also use asyncio.gather

async def main():
await asyncio.gather(say_boo(), say_baa())


asyncio.run(main())