将异步与多处理相结合会出现什么样的问题(如果有的话) ?

几乎每个人在第一次使用 Python 进行线程处理时都意识到,对于那些真正想要并行处理(或者至少给它一个机会)的人来说,GIL 让他们的生活变得很糟糕。

我目前正在考虑实现类似反应堆模式的东西。实际上,我希望在一个类线程上侦听传入的套接字连接,当有人尝试连接时,接受该连接并将其传递给另一个类线程进行处理。

我还不确定我将面临什么样的负担。我知道目前对传入消息设置了2MB 的上限。理论上我们每秒可以得到几千个(虽然我不知道实际上我们是否看到过类似的情况)。处理消息所花费的时间并不重要,尽管显然更快会更好。

我正在研究反应器模式,并开发了一个使用 multiprocessing库的小例子,(至少在测试中)似乎工作得很好。但是,现在/不久我们将有 异步库可用,它将为我处理事件循环。

asynciomultiprocessing结合有什么东西可以咬我吗?

34520 次浏览

是的,有相当多的东西可能会(也可能不会)咬你。

  • 当您运行类似 asyncio的程序时,它希望在一个线程或进程上运行。这本身并不适用于并行处理。您必须以某种方式分发工作,同时将 IO 操作(特别是套接字上的操作)留在单个线程/进程中。
  • 虽然将单个连接移交给不同处理程序进程的想法不错,但是很难实现。第一个障碍是,您需要一种方法在不关闭 asyncio的情况下将连接从 asyncio中拉出。下一个障碍是,除非使用 C 扩展中的特定于平台(可能是 Linux)的代码,否则不能简单地将文件描述符发送到不同的进程。
  • 请注意,已知 multiprocessing模块创建了许多用于通信的线程。大多数情况下,当您使用通信结构(例如 Queue)时,会产生一个线程。不幸的是,这些线程并不是完全不可见的。例如,它们可能无法干净利落地拆除(当您打算终止程序时) ,但是根据它们的数量,资源使用本身可能是显而易见的。

如果您真的打算在单个流程中处理单个连接,我建议检查不同的方法。例如,您可以将套接字置于侦听模式,然后同时并行地接受来自多个辅助进程的连接。一旦工作者完成了对请求的处理,它就可以接受下一个连接,因此与为每个连接分叉进程相比,您仍然使用了更少的资源。例如,垃圾邮件杀手和 Apache (mpm prefork)可以使用这个工作者模型。根据您的用例,它最终可能更容易、更健壮。具体来说,您可以使您的工作者在服务了配置数量的请求之后死亡,并由一个主进程重新生成,从而消除了内存泄漏的许多负面影响。

参见 PEP 3156,特别是关于线程交互的部分:

Http://www.python.org/dev/peps/pep-3156/#thread-interaction

这清楚地记录了您可能使用的新的异步方法,包括 run _ in _ Executive ()。请注意,Execator 是在 concurrent.future 中定义的,我建议您在那里也可以查看一下。

您应该能够安全地组合 asynciomultiprocessing而不会遇到太多麻烦,尽管您不应该直接使用 multiprocessingasyncio(以及任何其他基于事件循环的异步框架)的主要缺点是阻塞了事件循环。如果您尝试直接使用 multiprocessing,那么无论何时您阻塞等待子进程,您都将阻塞事件循环。很明显,情况不妙。

避免这种情况的最简单方法是使用 BaseEventLoop.run_in_executorconcurrent.futures.ProcessPoolExecutor中执行函数。ProcessPoolExecutor是一个使用 multiprocessing.Process实现的进程池,但是 asyncio内置支持在其中执行函数,而不会阻塞事件循环。这里有一个简单的例子:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor


def blocking_func(x):
time.sleep(x) # Pretend this is expensive calculations
return x * 5


@asyncio.coroutine
def main():
#pool = multiprocessing.Pool()
#out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
executor = ProcessPoolExecutor()
out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
print(out)


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

对于大多数情况来说,这是函数本身就足够好了。如果您发现自己需要 multiprocessing中的其他结构,如 QueueEventManager等,有一个名为 aioprocessing的第三方库(完全公开: 我编写了它) ,它提供所有 multiprocessing数据结构的兼容 asyncio版本。这里有一个例子:

import time
import asyncio
import aioprocessing
import multiprocessing


def func(queue, event, lock, items):
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()


@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()


@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker


if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.async(example(queue, event, lock)),
asyncio.async(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

基于@dano 上面的回答,我编写了这个函数来替换我过去使用多进程池 + 映射的位置。

def asyncio_friendly_multiproc_map(fn: Callable, l: list):
"""
This is designed to replace the use of this pattern:
with multiprocessing.Pool(5) as p:
results = p.map(analyze_day, list_of_days)
By letting caller drop in replace:
asyncio_friendly_multiproc_map(analyze_day, list_of_days)
"""
tasks = []
with ProcessPoolExecutor(5) as executor:
for e in l:
tasks.append(asyncio.get_event_loop().run_in_executor(executor, fn, e))
res = asyncio.get_event_loop().run_until_complete(asyncio.gather(*tasks))
return res