为生产者-消费者流使用异步.Queue

我对于如何将 asyncio.Queue用于特定的生产者-消费者模式感到困惑,在这种模式中,生产者和消费者并发和独立地进行操作。

首先,考虑这个例子,它紧跟着 asyncio.Queue的文件的例子:

import asyncio
import random
import time


async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:0.2f} seconds')


async def main(n):
queue = asyncio.Queue()
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(n):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


if __name__ == '__main__':
import sys
n = 3 if len(sys.argv) == 1 else sys.argv[1]
asyncio.run(main())

关于这个脚本还有一个更好的细节: 通过 queue.put_nowait(sleep_for)超过常规的 for 循环,将项同步放入队列中。

我的目标是创建一个使用 async def worker()(或 consumer())和 async def producer()的脚本。两者都应该被安排并发运行。没有一个消费者协同程序被显式地绑定到或链接到生产者。

如何修改上述程序,使生产者成为自己的协同程序,可以与使用者/工作者并发调度?


还有来自 PYMOTW的第二个例子。它要求生产者提前知道消费者的数量,并使用 None作为消费者生产完成的信号。

57822 次浏览

如何修改上述程序,使生产者成为自己的协同程序,可以与使用者/工作者并发调度?

这个例子可以概括而不改变其基本逻辑:

  • 将插入循环移动到单独的生成器协同程序。
  • 在后台启动消费者,让他们在产品生产时处理产品。
  • 随着消费者的运行,启动生产者并等待他们完成生产项目,如 await producer()await gather(*producers)等。
  • 所有生产者完成后,等待消费者使用 await queue.join()处理剩余的项目。
  • 取消消费者,所有消费者现在都在无所事事地等待队列交付下一个商品,因为我们知道生产者已经完成了,这个商品永远不会到达。

下面是一个实现上述内容的例子:

import asyncio, random
 

async def rnd_sleep(t):
# sleep for T seconds on average
await asyncio.sleep(t * random.random() * 2)
 

async def producer(queue):
while True:
# produce a token and send it to a consumer
token = random.random()
print(f'produced {token}')
if token < .05:
break
await queue.put(token)
await rnd_sleep(.1)
 

async def consumer(queue):
while True:
token = await queue.get()
# process the token received from a producer
await rnd_sleep(.3)
queue.task_done()
print(f'consumed {token}')
 

async def main():
queue = asyncio.Queue()
 

# fire up the both producers and consumers
producers = [asyncio.create_task(producer(queue))
for _ in range(3)]
consumers = [asyncio.create_task(consumer(queue))
for _ in range(10)]
 

# with both producers and consumers running, wait for
# the producers to finish
await asyncio.gather(*producers)
print('---- done producing')
 

# wait for the remaining tasks to be processed
await queue.join()
 

# cancel the consumers, which are now idle
for c in consumers:
c.cancel()
 

asyncio.run(main())

请注意,在实际的生产者和消费者中,特别是那些涉及网络访问的生产者和消费者中,您可能希望捕获处理过程中发生的 IO 相关异常。如果异常是可恢复的,就像大多数与网络相关的异常一样,那么您可以简单地捕获异常并记录错误。您仍然应该调用 task_done(),因为否则 queue.join()将由于未处理的项而挂起。如果重新尝试处理该项是有意义的,那么可以在调用 task_done()之前将其返回到队列中。例如:

# like the above, but handling exceptions during processing:
async def consumer(queue):
while True:
token = await queue.get()
try:
# this uses aiohttp or whatever
await process(token)
except aiohttp.ClientError as e:
print(f"Error processing token {token}: {e}")
# If it makes sense, return the token to the queue to be
# processed again. (You can use a counter to avoid
# processing a faulty token infinitely.)
#await queue.put(token)
queue.task_done()
print(f'consumed {token}')