在几个进程之间共享结果队列

multiprocessing模块的文档显示了如何将队列传递给以 multiprocessing.Process启动的进程。但是我如何能够与用 apply_async启动的异步工作进程共享一个队列呢?我不需要动态加入或其他任何东西,只是一种方式,为工人(重复)报告他们的结果回基地。

import multiprocessing
def worker(name, que):
que.put("%d is done" % name)


if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
q = multiprocessing.Queue()
workers = pool.apply_async(worker, (33, q))

这种做法失败的原因是: RuntimeError: Queue objects should only be shared between processes through inheritance. 我理解这意味着什么,也理解继承而不是要求 pickle/unpickle (以及所有特殊的 Windows 限制)的建议。但是 do如何以一种有效的方式传递队列呢?我找不到一个例子,而且我已经尝试了几种方法,但都以各种方式失败了。救命啊?

76474 次浏览

尝试使用 多处理器,经理来管理队列,并让不同的工作者可以访问它。

import multiprocessing
def worker(name, que):
que.put("%d is done" % name)


if __name__ == '__main__':
pool = multiprocessing.Pool(processes=3)
m = multiprocessing.Manager()
q = m.Queue()
workers = pool.apply_async(worker, (33, q))

multiprocessing.Pool已经有一个共享的结果队列,不需要额外涉及 Manager.QueueManager.Queue是底层的 queue.Queue(多线程队列) ,位于单独的服务器进程上,通过代理公开。与 Pool 的内部队列相比,这会增加额外的开销。与依赖 Pool 的本机结果处理相反,Manager.Queue中的结果也不能保证被订购。

辅助进程是用 .apply_async()启动的 没有,这在实例化 Pool时已经发生了 当你调用 pool.apply_async()是一个新的“工作”。Pool 的 worker-process 在底层运行 multiprocessing.pool.worker-函数。这个函数负责处理通过 Pool 的内部 Pool._inqueue传输的新“任务”,并通过 Pool._outqueue将结果发送回父级。指定的 func将在 multiprocessing.pool.worker中执行。func只需要向 return发送一些东西,结果就会自动发送回父代。

.apply_async() immediately(异步)返回 AsyncResult对象(ApplyResult的别名)。您需要对该对象调用 .get()(正在阻塞)来接收实际结果。另一种选择是注册一个 复试函数,一旦结果准备就绪,就会触发该函数。

from multiprocessing import Pool


def busy_foo(i):
"""Dummy function simulating cpu-bound work."""
for _ in range(int(10e6)):  # do stuff
pass
return i


if __name__ == '__main__':


with Pool(4) as pool:
print(pool._outqueue)  # DEMO
results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
# `.apply_async()` immediately returns AsyncResult (ApplyResult) object
print(results[0])  # DEMO
results = [res.get() for res in results]
print(f'result: {results}')

示例输出:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

注意: 为 .get()指定 timeout参数并不会停止 worker 中任务的实际处理,它只是通过引发 multiprocessing.TimeoutError来解除等待的父级的阻塞。