asyncio实际上是如何工作的?

这个问题是由我的另一个问题如何在cdef等待?引起的

网上有大量关于asyncio的文章和博客文章,但它们都非常肤浅。我找不到任何关于asyncio如何实际实现的信息,以及什么使I/O异步。我试图阅读源代码,但它有数千行不是最高级的C代码,其中很多处理辅助对象,但最重要的是,它很难将Python语法和它将转换成的C代码联系起来。

Asycnio自己的文档就更没有帮助了。这里没有关于它如何工作的信息,只有一些关于如何使用它的指南,这些指南有时也会误导/写得很糟糕。

我熟悉Go的协程实现,并希望Python也能做同样的事情。如果是这样的话,我在上面链接的帖子中出现的代码应该是有效的。既然它没有,我现在正试图找出原因。到目前为止,我最好的猜测如下,请纠正我的错误:

  1. 形式为async def foo(): ...的过程定义实际上被解释为继承coroutine的类的方法。
  2. 也许,async def实际上被await语句分割成多个方法,其中调用这些方法的对象能够跟踪到目前为止执行的进度。
  3. 如果以上是正确的,那么,本质上,协程的执行归结为通过某个全局管理器调用协程对象的方法(循环?)
  4. 全局管理器以某种方式(如何?)意识到Python代码何时执行I/O操作,并且能够在当前执行方法放弃控制(命中await语句)后选择一个暂挂的协程方法来执行。

换句话说,这是我试图将一些asyncio语法“糖化”成更容易理解的东西:

async def coro(name):
print('before', name)
await asyncio.sleep()
print('after', name)


asyncio.gather(coro('first'), coro('second'))


# translated from async def coro(name)
class Coro(coroutine):
def before(self, name):
print('before', name)


def after(self, name):
print('after', name)


def __init__(self, name):
self.name = name
self.parts = self.before, self.after
self.pos = 0


def __call__():
self.parts[self.pos](self.name)
self.pos += 1


def done(self):
return self.pos == len(self.parts)




# translated from asyncio.gather()
class AsyncIOManager:


def gather(*coros):
while not every(c.done() for c in coros):
coro = random.choice(coros)
coro()

如果我猜对了,那我就有麻烦了。在这种情况下,I/O实际上是如何发生的?在一个单独的线程?整个解释器挂起,I/O发生在解释器之外吗?I/O到底是什么意思?如果我的python过程称为c__abc0过程,它反过来发送中断到内核,放弃控制它,python解释器如何知道这一点,并能够继续运行一些其他代码,而内核代码进行实际的I/O,直到它唤醒最初发送中断的python过程?原则上,Python解释器如何意识到这种情况?

64458 次浏览

这一切都归结为asyncio正在解决的两个主要挑战:

  • 如何在一个线程中执行多个I/O ?
  • 如何实现协同多任务处理?

第一点的答案已经存在很长一段时间了,被称为选择循环。在python中,它在选择器模块. xml中实现。

第二个问题与协同程序的概念有关,即可以停止执行并稍后恢复的函数。在python中,协程是使用发电机产量从语句实现的。这就是隐藏在异步/等待语法后面的东西。

这个回答中有更多的资源。


编辑:解决你对goroutines的评论:

在asyncio中最接近于goroutine的实际上不是协程,而是任务(参见文档中的区别)。在python中,协程(或生成器)不知道事件循环或I/O的概念。它只是一个函数,可以使用yield停止执行,同时保持其当前状态,因此以后可以恢复它。yield from语法允许以透明的方式链接它们。

现在,在asyncio任务中,链的最底部的协程总是最终产生未来。然后,这个future出现在事件循环中,并集成到内部机制中。当future被其他内部回调设置为done时,事件循环可以通过将future发送回协程链来恢复任务。


编辑:在你的帖子中解决一些问题:

在这种情况下,I/O实际上是如何发生的?在一个单独的线程?整个解释器挂起,I/O发生在解释器之外吗?

不,线程中没有发生任何事情。I/O总是由事件循环管理,主要是通过文件描述符。然而,这些文件描述符的注册通常被高级协程隐藏,这就为您带来了麻烦。

I/O到底是什么意思?如果我的python过程调用C open()过程,它反过来将中断发送给内核,放弃对它的控制,python解释器如何知道这一点,并能够继续运行一些其他代码,而内核代码进行实际的I/O,直到它唤醒最初发送中断的python过程?原则上,Python解释器如何意识到这种情况?

I/O是任何阻塞调用。在asyncio中,所有的I/O操作都应该经过事件循环,因为如您所说,事件循环无法意识到某些同步代码中正在执行阻塞调用。这意味着你不应该在协程的上下文中使用同步open。相反,应该使用专门的库,例如aiofiles,它提供了open的异步版本。

你的coro去糖在概念上是正确的,但有点不完整。

await不会无条件挂起,只有当它遇到阻塞调用时才会挂起。它是如何知道呼叫被阻塞的?这是由正在等待的代码决定的。例如,socket read的可等待实现可以被糖化为:

def read(sock, n):
# sock must be in non-blocking mode
try:
return sock.recv(n)
except EWOULDBLOCK:
event_loop.add_reader(sock.fileno, current_task())
return SUSPEND

在实际asyncio中,等价的代码修改Future的状态,而不是返回神奇的值,但概念是相同的。当适当地适应一个类似生成器的对象时,上面的代码可以被awaited。

在调用方,当你的协程包含:

data = await read(sock, 1024)

它糖化成类似的东西:

data = read(sock, 1024)
if data is SUSPEND:
return SUSPEND
self.pos += 1
self.parts[self.pos](...)

熟悉生成器的人倾向于用yield from来描述上面的内容,它会自动挂起。

挂起链一直延续到事件循环,该事件循环注意到协程被挂起,将其从可运行集中移除,并继续执行可运行的协程(如果有的话)。如果没有可运行的协程,循环将在select()中等待,直到协程感兴趣的文件描述符准备好进行IO或超时。(事件循环维护一个文件描述符到协程的映射。)

在上面的例子中,一旦select()告诉事件循环sock是可读的,它将重新将coro添加到可运行集,因此它将从挂起点继续执行。

换句话说:

  1. 一切默认发生在同一个线程中。

  2. 事件循环负责调度协程,并在协程正在等待的任何事情(通常是一个通常会阻塞的IO调用或超时)准备就绪时唤醒协程。

为了深入了解协程驱动事件循环,我推荐Dave Beazley的这个演讲,他在现场观众面前演示了从头开始编写事件循环。

asyncio是如何工作的?

在回答这个问题之前,我们需要了解一些基本术语,如果你已经知道其中任何一个,就跳过这些。

发电机

生成器是允许我们暂停python函数执行的对象。用户管理生成器使用关键字< >强yield < / >强实现。通过创建一个包含yield关键字的普通函数,我们将该函数转换为生成器:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration

如你所见,在生成器上调用next()会导致解释器加载测试的帧,并返回yielded值。再次调用next(),会导致该帧再次加载到解释器堆栈中,并继续__abc1获取另一个值。

在第三次调用next()时,生成器结束,并抛出StopIteration

与发电机通信

生成器的一个鲜为人知的特性是,您可以使用两种方法与它们通信:send()throw()

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in test
Exception

在调用gen.send()时,该值将作为yield关键字的返回值传递。

另一方面,gen.throw()允许在生成器内部抛出异常,在调用yield的同一位置引发异常。

从生成器返回值

从生成器返回一个值,会导致该值被放入StopIteration异常中。我们可以稍后从异常中恢复值,并将其用于我们的需要。

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

注意,一个新的关键字:yield from

Python 3.4添加了一个新的关键字:yield from。该关键字允许我们做的是将任何next()send()throw()传递到最内部嵌套的生成器中。如果内部生成器返回一个值,它也是yield from的返回值:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我已经写了一篇文章来进一步阐述这个主题。

把它们放在一起

在Python 3.4中引入新的关键字yield from后,我们现在能够在生成器中创建生成器,就像隧道一样,将数据从最里面的生成器来回传递到最外面的生成器。这为生成器带来了一个新的含义——协同程序

协同程序是可以在运行时停止和恢复的函数。在Python中,它们是使用async def关键字定义的。就像生成器一样,它们也使用自己的yield from形式,即await。在Python 3.5中引入asyncawait之前,我们以与生成器创建完全相同的方式创建协程(使用yield from而不是await)。

async def inner():
return 1


async def outer():
await inner()

就像所有迭代器和生成器都实现了__iter__()方法一样,所有协程都实现了__await__(),这允许它们在每次调用await coro时继续执行。

Python文档中有一个很好的序列图,你应该检查一下。

在asyncio中,除了协程函数,我们还有两个重要的对象:任务期货

期货

期货是实现了__await__()方法的对象,它们的工作是保存某种状态和结果。状态可以是以下状态之一:

  1. PENDING - future没有任何结果或异常集。
  2. CANCELLED -使用fut.cancel()取消future
  3. FINISHED - future被结束,要么是由使用fut.set_result()的结果集完成,要么是由使用fut.set_exception()的异常集完成

正如您所猜测的那样,结果可以是一个将返回的Python对象,也可以是一个可能引发的异常。

future对象的另一个重要的特性是它们包含一个名为add_done_callback()的方法。此方法允许在任务完成时立即调用函数——无论它引发异常还是完成。

Tasks .

任务对象是特殊的期货,它围绕着协程,并与最内部和最外部的协程通信。每次协程__abc0一个future时,future就会一直传递回任务(就像在yield from中一样),然后任务接收它。

接下来,任务将自己绑定到未来。它通过在future对象上调用add_done_callback()来实现。从现在开始,如果将来要完成,无论是取消,传递异常,还是传递一个Python对象,任务的回调将被调用,它将上升到存在。

Asyncio

我们必须回答的最后一个紧迫问题是——IO是如何实现的?

在asyncio的深处,我们有一个事件循环。任务的事件循环。事件循环的工作是在每次任务准备就绪时调用它们,并将所有工作协调到一个工作机器中。

事件循环的IO部分构建在一个名为select的关键函数之上。Select是一个阻塞函数,由下面的操作系统实现,允许在套接字上等待传入或传出数据。在接收到数据时,它将被唤醒,并返回接收到数据的套接字或准备写入的套接字。

当您尝试通过asyncio通过套接字接收或发送数据时,下面实际发生的情况是,首先检查套接字是否有任何可以立即读取或发送的数据。如果套接字的.send()缓冲区已满,或者.recv()缓冲区为空,则套接字被注册到select函数(只需将其添加到其中一个列表中,rlist用于recvwlist用于send),相应的函数awaits一个新创建的future对象,绑定到该套接字。

当所有可用任务都在等待未来时,事件循环调用select并等待。当其中一个套接字有传入数据,或者它的send缓冲区耗尽时,asyncio检查绑定到该套接字的未来对象,并将其设置为done。

现在奇迹发生了。未来已设置完成,之前用add_done_callback()添加自己的任务将复活,并在协程上调用.send(),该协程恢复最内部的协程(因为await链),并且您从它溢出到的附近缓冲区读取新接收的数据。

方法链,在recv()的情况下:

  1. select.select等待。
  2. 返回一个就绪的套接字,其中包含数据。
  3. 来自套接字的数据被移动到缓冲区中。
  4. future.set_result()被调用。
  5. add_done_callback()添加自己的任务现在被唤醒。
  6. Task在协程上调用.send(),它会一直进入最内部的协程并唤醒它。
  7. 数据从缓冲区读取并返回给我们的普通用户。

总之,asyncio使用生成器功能,允许暂停和恢复函数。它使用yield from功能,允许从最内部的生成器来回传递数据到最外部的生成器。它使用所有这些方法是为了在等待IO完成时暂停函数执行(通过使用OS select函数)。

最好的是什么?当一个函数暂停时,另一个函数可能会运行并与精致的结构交织,这是asyncio的。

谈论async/awaitasyncio不是一回事。前者是一个基本的、低级的构造(协程),而后者是一个使用这些构造的库。相反,没有唯一的最终答案。

下面是async/await和__abc1类库如何工作的一般描述。也就是说,上面可能还有其他技巧(有……),但它们都是无关紧要的,除非你自己创建它们。这种差异应该可以忽略不计,除非你已经知道得足够多,不必问这样的问题。

1. 简而言之,协程与子例程

就像子例程(函数,过程,…)一样,协同程序(生成器,…)是调用堆栈和指令指针的抽象:有一个执行代码段的堆栈,每个代码段都位于特定的指令上。

defasync def的区别只是为了清晰。实际的区别是returnyield。由此,awaityield from从单个调用到整个堆栈。

1.1. 子例程

子例程表示一个新的堆栈级别,以保存局部变量,并对其指令进行一次遍历以达到结束。考虑这样的子程序:

def subfoo(bar):
qux = 3
return qux * bar

当你运行它时,这意味着

  1. barqux分配堆栈空间
  2. 递归地执行第一个语句并跳转到下一个语句
  3. 一旦到达return,将其值推入调用堆栈
  4. 清除堆栈(1.)和指令指针(2.)

值得注意的是,4。意味着子程序总是以相同的状态开始。函数本身专有的所有内容在完成时都将丢失。函数不能被恢复,即使在return之后有指令。

root -\
:    \- subfoo --\
:/--<---return --/
|
V

1.2. 协程作为持久子例程

协程类似于子例程,但可以退出没有破坏其状态。考虑这样一个协程:

 def cofoo(bar):
qux = yield bar  # yield marks a break point
return qux

当你运行它时,这意味着

  1. barqux分配堆栈空间
  2. 递归执行第一个语句并跳转到下一个语句
    1. 一旦到达yield,将其值推入调用堆栈但是存储堆栈和指令指针
    2. 一旦调用yield,恢复堆栈和指令指针,并将参数推入qux
  3. 一旦到达return,将其值推入调用堆栈
  4. 清除堆栈(1.)和指令指针(2.)

注意2.1和2.2的添加—协程可以在预定义的点挂起和恢复。这与子例程在调用另一个子例程时挂起类似。不同之处在于活动协程没有严格绑定到它的调用堆栈。相反,挂起的协程是一个单独的、孤立的堆栈的一部分。

root -\
:    \- cofoo --\
:/--<+--yield --/
|    :
V    :

这意味着挂起的协程可以在堆栈之间自由存储或移动。任何访问协程的调用栈都可以决定恢复它。

1.3. 遍历调用堆栈

到目前为止,我们的协程只在调用堆栈中使用yield。子例程可以使用return()向下调用堆栈。为了完整起见,协程还需要一种机制来上升到调用堆栈。考虑这样一个协程:

def wrap():
yield 'before'
yield from cofoo()
yield 'after'

当您运行它时,这意味着它仍然像子例程一样分配堆栈和指令指针。当它挂起时,这仍然像存储一个子程序。

然而,yield from执行这两个。它挂起堆栈,wrap的指令指针而且运行cofoo。注意,wrap会一直挂起,直到cofoo完全完成。每当cofoo挂起或发送一些东西时,cofoo直接连接到调用堆栈。

1.4. 协程一直到下面

yield from允许将两个作用域连接到另一个中间作用域。当递归应用时,这意味着堆栈的可以连接到堆栈的

root -\
:    \-> coro_a -yield-from-> coro_b --\
:/ <-+------------------------yield ---/
|    :
:\ --+-- coro_a.send----------yield ---\
:                             coro_b <-/

注意rootcoro_b彼此不知道。这使得协程比回调简洁得多:协程仍然像子例程一样建立在1:1的关系上。协程挂起并恢复其整个现有的执行堆栈,直到常规调用点。

值得注意的是,root可以有任意数量的协程来恢复。然而,它永远不能同时恢复多个。相同根的协程是并发的,但不是并行的!

1.5. Python的asyncawait

到目前为止,解释已经显式地使用生成器的yieldyield from词汇表-底层功能是相同的。新的Python3.5语法asyncawait的存在主要是为了清晰。

def foo():  # subroutine?
return None


def foo():  # coroutine?
yield from foofoo()  # generator? coroutine?


async def foo():  # coroutine!
await foofoo()  # coroutine!
return None

async forasync with语句是必需的,因为你会用纯粹的forwith语句打破yield from/await链。

2. 一个简单事件循环的剖析

协程本身没有将控制权交给另一个协程的概念。它只能在协程堆栈的底部将控制权交给调用方。然后,调用者可以切换到另一个协程并运行它。

几个协程的根节点通常是事件循环:在挂起时,协程会产生一个它想要恢复的事件。反过来,事件循环能够有效地等待这些事件发生。这允许它决定接下来运行哪个协程,或者在恢复之前如何等待。

这样的设计意味着循环可以理解一组预定义的事件。几个协程彼此await,直到最后一个事件awaited。该事件可以通过yielding控件与事件循环进行直接通信。

loop -\
:    \-> coroutine --await--> event --\
:/ <-+----------------------- yield --/
|    :
|    :  # loop waits for event to happen
|    :
:\ --+-- send(reply) -------- yield --\
:        coroutine <--yield-- event <-/

关键是协程挂起允许事件循环和事件直接通信。中间协程堆栈不需要任何关于哪个循环正在运行它,也不需要事件如何工作的知识。

2.1.1. 时间事件

最简单的事件是到达某个时间点。这也是线程代码的一个基本块:线程重复sleeps,直到条件为真。 然而,常规的sleep本身会阻塞执行——我们希望其他协程不被阻塞。相反,我们想告诉事件循环什么时候应该恢复当前的协程堆栈

2.1.2. 定义事件

事件只是一个我们可以识别的值——通过枚举、类型或其他标识。我们可以用一个存储目标时间的简单类来定义它。除了存储事件信息外,我们还可以允许直接await类。

class AsyncSleep:
"""Event to sleep until a point in time"""
def __init__(self, until: float):
self.until = until


# used whenever someone ``await``s an instance of this Event
def __await__(self):
# yield this Event to the loop
yield self
    

def __repr__(self):
return '%s(until=%.1f)' % (self.__class__.__name__, self.until)

这个类只商店事件-它没有说如何实际处理它。

唯一的特殊特性是__await__ -它是await关键字所寻找的。实际上,它是一个迭代器,但不适用于常规迭代机制。

2.2.1. 等待事件

现在我们有了一个事件,协程如何对它做出反应?我们应该能够通过awaiting我们的事件来表达与sleep等价的内容。为了更好地了解发生了什么,有一半的时间我们会等待两次:

import time


async def asleep(duration: float):
"""await that ``duration`` seconds pass"""
await AsyncSleep(time.time() + duration / 2)
await AsyncSleep(time.time() + duration / 2)

我们可以直接实例化并运行这个协程。与生成器类似,使用coroutine.send运行协程,直到yields得到结果。

coroutine = asleep(100)
while True:
print(coroutine.send(None))
time.sleep(0.1)

这给了我们两个AsyncSleep事件,然后在协程完成时给了我们一个StopIteration事件。注意,唯一的延迟来自循环中的time.sleep !每个AsyncSleep只存储当前时间的偏移量。

2.2.2. 事件+睡眠

此时,我们有两个单独的机制供我们使用:

  • AsyncSleep可从协程内部产生的事件
  • time.sleep可以等待而不影响协程

值得注意的是,这两者是正交的:一个不会影响或触发另一个。因此,我们可以提出自己的策略sleep来满足AsyncSleep的延迟。

2.3. 一个简单的事件循环

如果我们有几个协程,每个协程都可以告诉我们它什么时候想被唤醒。然后,我们可以等待其中的第一个,然后等待下一个,依此类推。值得注意的是,在每个点上,我们只关心哪一个是下一个

这就形成了一个简单的调度:

  1. 按所需的唤醒时间对协程进行排序
  2. 选第一个想醒来的
  3. 等到这个时间点
  4. 运行这个协程
  5. 从1开始重复。

简单的实现不需要任何高级概念。list允许根据日期对协程进行排序。等待是常规的time.sleep。运行协程就像之前使用coroutine.send一样。

def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
# store wake-up-time and coroutines
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting:
# 2. pick the first coroutine that wants to wake up
until, coroutine = waiting.pop(0)
# 3. wait until this point in time
time.sleep(max(0.0, until - time.time()))
# 4. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])

当然,这方面还有很大的改进空间。我们可以为等待队列使用堆,或为事件使用分派表。我们也可以从StopIteration中获取返回值,并将它们分配给协程。然而,基本原则是不变的。

2.4. 合作等

AsyncSleep事件和run事件循环是计时事件的完全工作实现。

async def sleepy(identifier: str = "coroutine", count=5):
for i in range(count):
print(identifier, 'step', i + 1, 'at %.2f' % time.time())
await asleep(0.1)


run(*(sleepy("coroutine %d" % j) for j in range(5)))

这会在五个协程之间进行合作切换,每个协程暂停0.1秒。即使事件循环是同步的,它仍然在0.5秒而不是2.5秒内执行工作。每个协程保存状态并独立行动。

3.I/O事件循环

支持sleep的事件循环适用于轮询。然而,在文件句柄上等待I/O可以更有效地完成:操作系统实现I/O,因此知道哪些句柄已经准备好了。理想情况下,事件循环应该支持显式的“ready for I/ o”;事件。

3.1. select调用

Python已经有一个接口来查询操作系统的读I/O句柄。当使用句柄来读取或写入时,它返回句柄准备好了来读取或写入:

readable, writable, _ = select.select(rlist, wlist, xlist, timeout)

例如,我们可以open一个文件来写,然后等待它准备好:

write_target = open('/tmp/foo')
readable, writable, _ = select.select([], [write_target], [])

一旦select返回,writable包含我们打开的文件。

3.2. 基本I/O事件

类似于AsyncSleep请求,我们需要为I/O定义一个事件。使用底层的select逻辑,事件必须引用一个可读的对象——比如open文件。此外,我们还存储要读取的数据量。

class AsyncRead:
def __init__(self, file, amount=1):
self.file = file
self.amount = amount
self._buffer = b'' if 'b' in file.mode else ''


def __await__(self):
while len(self._buffer) < self.amount:
yield self
# we only get here if ``read`` should not block
self._buffer += self.file.read(1)
return self._buffer


def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.file, self.amount, len(self._buffer)
)

AsyncSleep一样,我们主要只存储底层系统调用所需的数据。这一次,__await__可以被多次恢复,直到我们想要的amount被读取。此外,我们return I/O结果,而不是仅仅恢复。

3.3. 用读I/O增加事件循环

事件循环的基础仍然是前面定义的run。首先,我们需要跟踪读请求。这不再是一个排序的调度,我们只将读请求映射到协程。

# new
waiting_read = {}  # type: Dict[file, coroutine]

由于select.select有一个timeout参数,我们可以用它来代替time.sleep

# old
time.sleep(max(0.0, until - time.time()))
# new
readable, _, _ = select.select(list(waiting_read), [], [])

这为我们提供了所有可读文件——如果有,我们运行相应的协程。如果没有,我们已经等待了足够长的时间来运行当前的协程。

# new - reschedule waiting coroutine, run readable coroutine
if readable:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read[readable[0]]

最后,我们必须实际监听读请求。

# new
if isinstance(command, AsyncSleep):
...
elif isinstance(command, AsyncRead):
...

3.4. 把它们放在一起

上面的描述有点简单化。我们需要做一些切换,如果我们总能读取,就不会饿死睡眠协程。我们需要面对无书可读、无事可等的现实。但是,最终结果仍然适合30 LOC。

def run(*coroutines):
"""Cooperatively run all ``coroutines`` until completion"""
waiting_read = {}  # type: Dict[file, coroutine]
waiting = [(0, coroutine) for coroutine in coroutines]
while waiting or waiting_read:
# 2. wait until the next coroutine may run or read ...
try:
until, coroutine = waiting.pop(0)
except IndexError:
until, coroutine = float('inf'), None
readable, _, _ = select.select(list(waiting_read), [], [])
else:
readable, _, _ = select.select(list(waiting_read), [], [], max(0.0, until - time.time()))
# ... and select the appropriate one
if readable and time.time() < until:
if until and coroutine:
waiting.append((until, coroutine))
waiting.sort()
coroutine = waiting_read.pop(readable[0])
# 3. run this coroutine
try:
command = coroutine.send(None)
except StopIteration:
continue
# 1. sort coroutines by their desired suspension ...
if isinstance(command, AsyncSleep):
waiting.append((command.until, coroutine))
waiting.sort(key=lambda item: item[0])
# ... or register reads
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine

3.5. 合作的I / O

AsyncSleepAsyncReadrun实现现在完全可以用于睡眠和/或读取。 与sleepy一样,我们可以定义一个帮助器来测试读取:

async def ready(path, amount=1024*32):
print('read', path, 'at', '%d' % time.time())
with open(path, 'rb') as file:
result = await AsyncRead(file, amount)
print('done', path, 'at', '%d' % time.time())
print('got', len(result), 'B')


run(sleepy('background', 5), ready('/dev/urandom'))

运行这个,我们可以看到我们的I/O与等待任务交织在一起:

id background round 1
read /dev/urandom at 1530721148
id background round 2
id background round 3
id background round 4
id background round 5
done /dev/urandom at 1530721148
got 1024 B

4. 非阻塞I / O

虽然文件上的I/O理解了这个概念,但它并不真正适合asyncio这样的库:select调用总是返回文件,而openread都可能块无限期。这会阻塞事件循环的所有协程——这很糟糕。像aiofiles这样的库使用线程和同步来伪造文件上的非阻塞I/O和事件。

然而,套接字确实允许非阻塞I/O,它们固有的延迟使它变得更加关键。当在事件循环中使用时,等待数据和重试可以被包装而不阻塞任何东西。

4.1. 非阻塞I/O事件

类似于AsyncRead,我们可以为套接字定义一个suspend-and-read事件。我们不取文件,而是取套接字——它必须是非阻塞的。同样,我们的__await__使用socket.recv而不是file.read

class AsyncRecv:
def __init__(self, connection, amount=1, read_buffer=1024):
assert not connection.getblocking(), 'connection must be non-blocking for async recv'
self.connection = connection
self.amount = amount
self.read_buffer = read_buffer
self._buffer = b''


def __await__(self):
while len(self._buffer) < self.amount:
try:
self._buffer += self.connection.recv(self.read_buffer)
except BlockingIOError:
yield self
return self._buffer


def __repr__(self):
return '%s(file=%s, amount=%d, progress=%d)' % (
self.__class__.__name__, self.connection, self.amount, len(self._buffer)
)

AsyncRead相比,__await__执行真正的非阻塞I/O。当数据可用时,读取总是。当没有可用数据时,总是将挂起。这意味着事件循环只在我们执行有用的工作时被阻塞。

4.2. 解除事件循环阻塞

就事件循环而言,没有什么变化。要监听的事件仍然与文件事件相同——由select标记为ready的文件描述符。

# old
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
# new
elif isinstance(command, AsyncRead):
waiting_read[command.file] = coroutine
elif isinstance(command, AsyncRecv):
waiting_read[command.connection] = coroutine

此时,AsyncReadAsyncRecv显然是同一类事件。我们可以很容易地将它们重构为具有可交换I/O组件的一个事件。实际上,事件循环、协程和事件干净地分离调度程序、任意中间代码和实际I/O。

4.3. 非阻塞I/O的丑陋一面

原则上,此时你应该做的是将read的逻辑复制为AsyncRecvrecv。然而,现在这要丑陋得多——当函数在内核内阻塞时,你必须处理早期返回,但控制权交给你。例如,打开一个连接比打开一个文件要长得多:

# file
file = open(path, 'rb')
# non-blocking socket
connection = socket.socket()
connection.setblocking(False)
# open without blocking - retry on failure
try:
connection.connect((url, port))
except BlockingIOError:
pass

长话短说,剩下的就是几十行异常处理。此时事件和事件循环已经工作了。

id background round 1
read localhost:25000 at 1530783569
read /dev/urandom at 1530783569
done localhost:25000 at 1530783569 got 32768 B
id background round 2
id background round 3
id background round 4
done /dev/urandom at 1530783569 got 4096 B
id background round 5

齿顶高

github上的示例代码 . b

什么是asyncio?

Asyncio代表异步输入输出,指的是使用单个线程或事件循环实现高并发的编程范例。 异步编程是一种并行编程,其中工作单元允许与主应用程序线程分开运行。当工作完成时,它通知主线程工作线程的完成或失败

让我们看看下图:

asynchronous_flow

让我们用一个例子来理解asyncio:

为了理解asyncio背后的概念,让我们考虑一家只有一个服务员的餐厅。突然,三个顾客,A, B和C出现了。他们三个人从服务员那里拿到菜单后,花了不同的时间来决定吃什么。

我们假设A需要5分钟,B需要10分钟,C需要1分钟来决定。如果单身服务员先从B开始,在10分钟内为B点餐,然后他为A服务,花5分钟记录他点的菜,最后花1分钟知道C想吃什么。 所以,服务员总共要花10 + 5 + 1 = 16分钟来记下他们点的菜。但是,请注意在这一系列事件中,C在服务员到达他之前等了15分钟,A等了10分钟,B等了0分钟 现在考虑一下,如果服务员知道每位顾客做出决定所需要的时间。他可以先从C开始,然后到A,最后到b。这样每个顾客的等待时间为0分钟。 错觉包含三个服务员,即使只有一个,也会创建一个服务于每个顾客的服务员

最后,服务员完成三份订单所需的总时间为10分钟,远少于另一种情况下的16分钟。

让我们来看另一个例子:

假设国际象棋大师马格努斯Carlsen举办了一场国际象棋展览,在展览中他与多名业余棋手一起下棋。他有两种方式进行展览:同步和异步。

假设:

  • 24的对手
  • 马格努斯Carlsen使每一步棋在5秒内走完
  • 每个对手有55秒的时间来移动
  • 游戏平均30对棋(总共60步)

同步:马格努斯·卡尔森一次玩一场游戏,从不同时玩两场,直到游戏完成。每个游戏花费(55 + 5) * 30 == 1800秒,或30分钟。整个展览耗时24 * 30 == 720分钟,或12个小时分钟。

异步:马格努斯·卡尔森从一张桌子移动到另一张桌子,在每张桌子上做一个动作。她离开牌桌,让对手在等待时间内采取下一步行动。Judit在所有24局游戏中的一次移动需要24 * 5 = 120秒,或2分钟秒。整个展示现在被缩减到120 * 30 == 3600秒,或者仅仅是1小时

世界上只有一个马格努斯·卡尔森(Magnus Carlsen),他只有两只手,自己一次只能走一步棋。但异步游戏将展示时间从12小时缩短至1小时。

代码示例:

让我们尝试使用代码片段演示同步和异步执行时间。

异步- async_count.py .py

import asyncio
import time
  

  

async def count():
print("One", end=" ")
await asyncio.sleep(1)
print("Two", end=" ")
await asyncio.sleep(2)
print("Three", end=" ")
  

  

async def main():
await asyncio.gather(count(), count(), count(), count(), count())
  

  

if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

异步输出:

One One One One One Two Two Two Two Two Three Three Three Three Three
Executing - async_count.py
Execution Starts: 18453.442160108
Executions Ends: 18456.444719712
Totals Execution Time:3.00 seconds.

Synchronous - sync_count.py .py

import time
  

  

def count():
print("One", end=" ")
time.sleep(1)
print("Two", end=" ")
time.sleep(2)
print("Three", end=" ")
  

  

def main():
for _ in range(5):
count()
  

  

if __name__ == "__main__":
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(f"\nExecuting - {__file__}\nExecution Starts: {start_time}\nExecutions Ends: {end_time}\nTotals Execution Time:{execution_time:0.2f} seconds.")

Synchronous - Output:

One Two Three One Two Three One Two Three One Two Three One Two Three
Executing - sync_count.py
Execution Starts: 18875.175965998
Executions Ends: 18890.189930292
Totals Execution Time:15.01 seconds.

为什么在Python中使用asyncio而不是多线程?

  • 编写线程安全的代码是非常困难的。使用异步代码,您可以确切地知道代码将从一个任务转移到下一个任务的位置,而竞争条件则很难出现。
  • 线程消耗相当数量的数据,因为每个线程都需要有自己的堆栈。使用异步代码,所有代码共享相同的堆栈,由于在任务之间不断地展开堆栈,因此堆栈保持较小。
  • 线程是操作系统结构,因此需要平台支持更多的内存。异步任务不存在这样的问题。

asyncio是如何工作的?

在深入讨论之前,让我们回顾一下Python Generator

Python发电机:

包含yield语句的函数被编译为生成器。在函数体中使用yield表达式会导致该函数成为生成器。这些函数返回一个支持迭代协议方法的对象。自动创建的生成器对象接收__next()__方法。回到上一节的例子,我们可以直接在生成器对象上调用__next__,而不是使用next():

def asynchronous():
yield "Educative"




if __name__ == "__main__":
gen = asynchronous()


str = gen.__next__()
print(str)

请记住以下关于生成器的内容:

  • 生成器函数允许您延迟计算昂贵的值。只有在需要时才计算下一个值。这使得生成器的内存和计算效率高;它们避免在内存中保存长序列或预先进行所有昂贵的计算。
  • 生成器在挂起时,保留代码位置(即执行的最后一个yield语句)及其整个局部作用域。这允许他们从中断的地方恢复执行。
  • 生成器对象只不过是迭代器。
  • 记住要区分生成器函数和相关联的生成器对象,它们通常可以互换使用。生成器函数在被调用时返回一个生成器对象,并且在生成器对象上调用next()以运行生成器函数中的代码。

发电机状态:

生成器会经历以下几种状态:

  • 当生成器函数第一次返回生成器对象且迭代尚未开始时,GEN_CREATED
  • GEN_RUNNING当next已在生成器对象上调用,并由python解释器执行。
  • GEN_SUSPENDED当发电机暂停在一个产量
  • GEN_CLOSED当生成器完成执行或关闭时。

generator_cycle

生成器对象上的方法:

生成器对象公开了可以调用来操作生成器的不同方法。这些都是:

  • throw()
  • send()
  • close()

让我们深入了解更多细节

asyncio的规则:

  • 语法async def引入了本地协同程序异步发电机。表达式async withasync for也是有效的。
  • 关键字await将函数控制传递回事件循环。(它暂停周围协程的执行。)如果Python在g()的作用域内遇到await f()表达式,这就是await告诉事件循环的方式,“暂停g()的执行,直到我正在等待的__abc5的结果返回。”与此同时,让别的事情去吧。”

在代码中,第二个要点大致如下所示:

async def g():
# Pause here and come back to g() when f() is ready
r = await f()
return r

关于何时以及如何使用async/await也有一组严格的规则。无论你是否还在学习语法,或者已经使用过async/await,这些都很方便:

    async def引入的函数是协程。它可以使用awaitreturnyield,但所有这些都是可选的。声明async def noop(): pass是有效的:
    • 使用await和/或return创建协程函数。要调用协程函数,你必须await它来获得它的结果。
    • async def块中使用yield不太常见。这将创建一个异步发电机,你可以用async for遍历它。暂时忘掉异步生成器,集中精力记下协程函数的语法,协程函数使用await和/或return
    • 任何用async def定义的东西都不能使用yield from,这会引发SyntaxError
  • 就像在def函数之外使用yieldSyntaxError一样,在async def协程之外使用await也是SyntaxError。你只能在协程体中使用await

以下是一些简短的例子,旨在总结上述几条规则:

async def f(x):
y = await z(x)     # OK - `await` and `return` allowed in coroutines
return y


async def g(x):
yield x            # OK - this is an async generator


async def m(x):
yield from gen(x)  # NO - SyntaxError


def m(x):
y = await z(x)     # NO - SyntaxError (no `async def` here)
return y

基于生成器的协程

Python创建了Python生成器和用于协程的生成器之间的区别。这些协程被称为基于生成器的协程,并且要求将装饰器@asynio.coroutine添加到函数定义中,尽管这并不是严格强制的。

基于生成器的协程使用yield from语法而不是yield语法。协程可以:

  • 屈服于另一个协程
  • 未来收益
  • 返回一个表达式
  • 提高异常

Python中的协程使协作多任务成为可能。 协作多任务处理是指正在运行的进程主动将CPU让给其他进程的方法。当一个进程在逻辑上被阻塞时,比如在等待用户输入时,或者当它发起了一个网络请求并将空闲一段时间时,它可能会这样做。 协程可以定义为一个特殊的函数,它可以在不失去状态的情况下将控制权交给调用者

那么协程和生成器之间的区别是什么呢?

生成器本质上是迭代器,尽管它们看起来像函数。一般来说,生成器和协程之间的区别是:

  • 生成器将一个值返回给调用者,而协程将控制权交还给另一个协程,并且可以从它放弃控制权开始恢复执行。
  • 一旦启动,生成器就不能接受参数,而协程可以。
  • 生成器主要用于简化迭代器的编写。它们是一种协程,有时也称为半协程。

基于生成器的协程示例

我们可以编写的最简单的基于生成器的协程如下所示:

@asyncio.coroutine
def do_something_important():
yield from asyncio.sleep(1)

协程休眠一秒。注意装饰器和yield from的使用。

本地基于协程示例

原生的意思是该语言引入了专门定义协程的语法,使它们成为该语言的第一类公民。本地协程可以使用async/await语法定义。 我们可以编写的最简单的基于本机的协程如下:

async def do_something_important():
await asyncio.sleep(1)

AsyncIO设计模式

AsyncIO有自己的一组可能的脚本设计,我们将在本节中讨论。

1. 事件循环

事件循环是一种编程构造,它等待事件发生,然后将它们分派给事件处理程序。事件可以是用户单击UI按钮,也可以是启动文件下载的进程。异步编程的核心是事件循环。

示例代码:

import asyncio
import random
import time
from threading import Thread
from threading import current_thread
  

# ANSI colors
colors = (
"\033[0m",   # End of color
"\033[31m",  # Red
"\033[32m",  # Green
"\033[34m",  # Blue
)
  

  

async def do_something_important(sleep_for):
print(colors[1] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
await asyncio.sleep(sleep_for)
  

  

def launch_event_loops():
# get a new event loop
loop = asyncio.new_event_loop()
  

# set the event loop for the current thread
asyncio.set_event_loop(loop)
  

# run a coroutine on the event loop
loop.run_until_complete(do_something_important(random.randint(1, 5)))
  

# remember to close the loop
loop.close()
  

  

if __name__ == "__main__":
thread_1 = Thread(target=launch_event_loops)
thread_2 = Thread(target=launch_event_loops)
  

start_time = time.perf_counter()
thread_1.start()
thread_2.start()
  

print(colors[2] + f"Is event loop running in thread {current_thread().getName()} = {asyncio.get_event_loop().is_running()}" + colors[0])
  

thread_1.join()
thread_2.join()
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Event Loop Start Time: {start_time}\nEvent Loop End Time: {end_time}\nEvent Loop Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令: python async_event_loop.py

输出:

async_event_loop

自己尝试并检查输出,您会发现每个衍生线程都在运行自己的事件循环。

事件循环的类型

有两种类型的事件循环:

  • SelectorEventLoop: SelectorEventLoop基于selectors模块,是所有平台上的默认循环。
  • ProactorEventLoop: ProactorEventLoop基于Windows的I/O完成端口,仅在Windows上支持。

2. 期货

Future表示正在进行或将在未来被调度的计算。它是一个特殊的低级可等待对象,表示异步操作的最终结果。不要混淆threading.Futureasyncio.Future

示例代码:

import time
import asyncio
from asyncio import Future
  

# ANSI colors
colors = (
"\033[0m",   # End of color
"\033[31m",  # Red
"\033[32m",  # Green
"\033[34m",  # Blue
)
  

  

async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")
  

  

async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])
  

  

async def main():
future = Future()
await asyncio.gather(foo(future), bar(future))
  

  

if __name__ == "__main__":
start_time = time.perf_counter()
asyncio.run(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令: python async_futures.py

输出:

async_futures

两个协程都传递了一个future。foo()协程等待future被解析,而bar()协程在三秒后解析future。

3.任务

任务就像未来,事实上,任务是未来的一个子类,可以使用以下方法创建:

  • asyncio.create_task()接受协程并将它们包装为任务。
  • loop.create_task()只接受协程。
  • asyncio.ensure_future()接受期货、协程和任何可等待对象。

任务包装协程并在事件循环中运行它们。如果一个协程在等待一个Future, Task将挂起该协程的执行并等待Future完成。当Future完成时,将继续执行封装的协程。

示例代码:

import time
import asyncio
from asyncio import Future
  

# ANSI colors
colors = (
"\033[0m",   # End of color
"\033[31m",  # Red
"\033[32m",  # Green
"\033[34m",  # Blue
)
  

  

async def bar(future):
print(colors[1] + "bar will sleep for 3 seconds" + colors[0])
await asyncio.sleep(3)
print(colors[1] + "bar resolving the future" + colors[0])
future.done()
future.set_result("future is resolved")
  

  

async def foo(future):
print(colors[2] + "foo will await the future" + colors[0])
await future
print(colors[2] + "foo finds the future resolved" + colors[0])
  

  

async def main():
future = Future()
  

loop = asyncio.get_event_loop()
t1 = loop.create_task(bar(future))
t2 = loop.create_task(foo(future))
  

await t2, t1
  

  

if __name__ == "__main__":
start_time = time.perf_counter()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[3] + f"Future Start Time: {start_time}\nFuture End Time: {end_time}\nFuture Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令: python async_tasks.py

输出:

async_tasks

4. 链协同程序:

协程的一个关键特性是它们可以被链接在一起。协程对象是可等待的,所以另一个协程可以await它。这允许你把程序分解成更小的、可管理的、可回收的协程:

示例代码:

import sys
import asyncio
import random
import time
  

# ANSI colors
colors = (
"\033[0m",  # End of color
"\033[31m",  # Red
"\033[32m",  # Green
"\033[36m",  # Cyan
"\033[34m",  # Blue
)
  

  

async def function1(n: int) -> str:
i = random.randint(0, 10)
print(colors[1] + f"function1({n}) is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-1"
print(colors[1] + f"Returning function1({n}) == {result}." + colors[0])
return result
  

  

async def function2(n: int, arg: str) -> str:
i = random.randint(0, 10)
print(colors[2] + f"function2{n, arg} is sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
result = f"result{n}-2 derived from {arg}"
print(colors[2] + f"Returning function2{n, arg} == {result}." + colors[0])
return result
  

  

async def chain(n: int) -> None:
start = time.perf_counter()
p1 = await function1(n)
p2 = await function2(n, p1)
end = time.perf_counter() - start
print(colors[3] + f"--> Chained result{n} => {p2} (took {end:0.2f} seconds)." + colors[0])
  

  

async def main(*args):
await asyncio.gather(*(chain(n) for n in args))
  

  

if __name__ == "__main__":
random.seed(444)
args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
start_time = time.perf_counter()
asyncio.run(main(*args))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

仔细注意输出,其中function1()休眠了一段可变的时间,而function2()在结果可用时开始工作:

执行命令: python async_chained.py 11 8 5

输出:

async_chained

5. 使用队列:

在这种设计中,没有任何个体消费者与生产者之间的链接。消费者不知道生产者的数量,甚至不知道将被添加到队列中的项目的累积数量。

单个生产者或消费者分别花费不同的时间从队列中放置和提取项目。队列作为一个吞吐量,可以与生产者和消费者进行通信,而不需要它们彼此直接通信。

示例代码:

import asyncio
import argparse
import itertools as it
import os
import random
import time
  

# ANSI colors
colors = (
"\033[0m",  # End of color
"\033[31m",  # Red
"\033[32m",  # Green
"\033[36m",  # Cyan
"\033[34m",  # Blue
)
  

  

async def generate_item(size: int = 5) -> str:
return os.urandom(size).hex()
  

  

async def random_sleep(caller=None) -> None:
i = random.randint(0, 10)
if caller:
print(colors[1] + f"{caller} sleeping for {i} seconds." + colors[0])
await asyncio.sleep(i)
  

  

async def produce(name: int, producer_queue: asyncio.Queue) -> None:
n = random.randint(0, 10)
for _ in it.repeat(None, n):  # Synchronous loop for each single producer
await random_sleep(caller=f"Producer {name}")
i = await generate_item()
t = time.perf_counter()
await producer_queue.put((i, t))
print(colors[2] + f"Producer {name} added <{i}> to queue." + colors[0])
  

  

async def consume(name: int, consumer_queue: asyncio.Queue) -> None:
while True:
await random_sleep(caller=f"Consumer {name}")
i, t = await consumer_queue.get()
now = time.perf_counter()
print(colors[3] + f"Consumer {name} got element <{i}>" f" in {now - t:0.5f} seconds." + colors[0])
consumer_queue.task_done()
  

  

async def main(no_producer: int, no_consumer: int):
q = asyncio.Queue()
producers = [asyncio.create_task(produce(n, q)) for n in range(no_producer)]
consumers = [asyncio.create_task(consume(n, q)) for n in range(no_consumer)]
await asyncio.gather(*producers)
await q.join()  # Implicitly awaits consumers, too
for consumer in consumers:
consumer.cancel()
  

  

if __name__ == "__main__":
random.seed(444)
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--no_producer", type=int, default=10)
parser.add_argument("-c", "--no_consumer", type=int, default=15)
ns = parser.parse_args()
start_time = time.perf_counter()
asyncio.run(main(**ns.__dict__))
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[4] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令: python async_queue.py -p 2 -c 4

输出:

async_queue

最后,让我们看一个asyncio如何减少等待时间的例子:给定一个协程generate_random_int(),它不断生成范围为[0,10]的随机整数,直到其中一个超出阈值,你想让这个协程的多个调用不需要彼此连续等待完成。

示例代码:

import time
import asyncio
import random
  

# ANSI colors
colors = (
"\033[0m",   # End of color
"\033[31m",  # Red
"\033[32m",  # Green
"\033[36m",  # Cyan
"\033[35m",  # Magenta
"\033[34m",  # Blue
)
  

  

async def generate_random_int(indx: int, threshold: int = 5) -> int:
print(colors[indx + 1] + f"Initiated generate_random_int({indx}).")
i = random.randint(0, 10)
while i <= threshold:
print(colors[indx + 1] + f"generate_random_int({indx}) == {i} too low; retrying.")
await asyncio.sleep(indx + 1)
i = random.randint(0, 10)
print(colors[indx + 1] + f"---> Finished: generate_random_int({indx}) == {i}" + colors[0])
return i
  

  

async def main():
res = await asyncio.gather(*(generate_random_int(i, 10 - i - 1) for i in range(3)))
return res
  

  

if __name__ == "__main__":
random.seed(444)
start_time = time.perf_counter()
r1, r2, r3 = asyncio.run(main())
print(colors[4] + f"\nRandom INT 1: {r1}, Random INT 2: {r2}, Random INT 3: {r3}\n" + colors[0])
end_time = time.perf_counter()
execution_time = end_time - start_time
print(colors[5] + f"Program Start Time: {start_time}\nProgram End Time: {end_time}\nProgram Execution Time: {execution_time:0.2f} seconds." + colors[0])

执行命令: python async_random.py

输出:

async_random

如果你自己编写任何代码,更喜欢本地协程 为了明确而不是含蓄。发电机的基础 协程将在Python 3.10中被移除

GitHub Repo: https://github.com/tssovi/asynchronous-in-python

它允许您编写单线程异步代码,并在Python中实现并发性。基本上,asyncio为异步编程提供了一个事件循环。例如,如果我们需要在不阻塞主线程的情况下发出请求,我们可以使用asyncio库。

asyncio模块允许实现异步编程 使用以下元素的组合:

  • 事件循环:asyncio模块允许每个进程有一个事件循环。

  • 协程:协程是遵循某些约定的生成器。它最有趣的特性是,它可以在执行期间挂起以等待外部处理(I/O中的某些例程),并在外部处理完成时从它停止的点返回。

  • 期货:期货代表一个尚未完成的过程。future是一个对象,它应该在未来有一个结果,并表示未完成的任务。

  • 任务:这是asyncio的子类。封装和管理的Future 协同程序。我们可以使用asyncio。封装协程的任务对象

asyncio中最重要的概念是事件循环。事件循环 允许您使用回调或协程编写异步代码。 理解asyncio的关键是协程和事件的术语 循环。协同程序是有状态函数,它的执行可以在另一个I/O操作正在执行时停止。事件循环用于协调协同例程的执行 要运行任何协程函数,我们需要获得一个事件循环。我们可以这样做 < / p >
    loop = asyncio.get_event_loop()

这给了我们一个BaseEventLoop对象。它有一个run_until_complete方法,它接受一个协程并运行它直到完成。然后,协程返回一个结果。在较低级别,事件循环执行BaseEventLoop.rununtilcomplete(future)方法。

如果你想象一个机场控制塔,许多飞机等待降落在同一条跑道上。控制塔可以被看作是事件循环,跑道是线程。每个平面都是一个等待执行的独立函数。事实上,一次只能有一架飞机降落在跑道上。asyncio主要做什么呢?它通过使用事件循环挂起函数来允许许多飞机同时降落在同一跑道上,并允许其他函数运行,当你使用await语法时,它基本上意味着飞机(函数可以挂起并允许其他函数处理!