最佳答案
我对于如何将 asyncio.Queue
用于特定的生产者-消费者模式感到困惑,在这种模式中,生产者和消费者并发和独立地进行操作。
首先,考虑这个例子,它紧跟着 asyncio.Queue
的文件的例子:
import asyncio
import random
import time
async def worker(name, queue):
while True:
sleep_for = await queue.get()
await asyncio.sleep(sleep_for)
queue.task_done()
print(f'{name} has slept for {sleep_for:0.2f} seconds')
async def main(n):
queue = asyncio.Queue()
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
tasks = []
for i in range(n):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
if __name__ == '__main__':
import sys
n = 3 if len(sys.argv) == 1 else sys.argv[1]
asyncio.run(main())
关于这个脚本还有一个更好的细节: 通过 queue.put_nowait(sleep_for)
超过常规的 for 循环,将项同步放入队列中。
我的目标是创建一个使用 async def worker()
(或 consumer()
)和 async def producer()
的脚本。两者都应该被安排并发运行。没有一个消费者协同程序被显式地绑定到或链接到生产者。
如何修改上述程序,使生产者成为自己的协同程序,可以与使用者/工作者并发调度?
还有来自 PYMOTW的第二个例子。它要求生产者提前知道消费者的数量,并使用 None
作为消费者生产完成的信号。