多处理-管道与队列

Python 的多处理程序包中,队列和管道之间的根本区别是什么?

在什么情况下,一方应该选择其中一方而不是另一方?什么时候使用 Pipe()比较有利?什么时候使用 Queue()是有利的?

104533 次浏览
  • Pipe()只能有两个端点。

  • Queue()可以有多个生产者和消费者。

何时使用它们

如果你需要两个以上的点通信,使用 Queue()

如果需要绝对性能,那么 Pipe()要快得多,因为 Queue()构建在 Pipe()之上。

绩效基准

假设您希望生成两个进程并在它们之间尽可能快地发送消息。这些是使用 Pipe()Queue()的类似测试之间的加速比赛的计时结果..。

顺便说一句,我把 SimpleQueue()JoinableQueue()的结果作为奖励。

  • queue.task_done()被调用时,JoinableQueue()会计算任务(它甚至不知道具体的任务,它只计算队列中未完成的任务) ,这样 queue.join()就知道工作已经完成。

这个答案底部的每个代码..。

# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.9.2


$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.14316844940185547 seconds
Sending 100000 numbers to Pipe() took 1.3749017715454102 seconds
Sending 1000000 numbers to Pipe() took 14.252539157867432 seconds
$  python multi_queue.py
Sending 10000 numbers to Queue() took 0.17014789581298828 seconds
Sending 100000 numbers to Queue() took 1.7723784446716309 seconds
Sending 1000000 numbers to Queue() took 17.758610725402832 seconds
$ python multi_simplequeue.py
Sending 10000 numbers to SimpleQueue() took 0.14937686920166016 seconds
Sending 100000 numbers to SimpleQueue() took 1.5389132499694824 seconds
Sending 1000000 numbers to SimpleQueue() took 16.871352910995483 seconds
$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.15144729614257812 seconds
Sending 100000 numbers to JoinableQueue() took 1.567549228668213 seconds
Sending 1000000 numbers to JoinableQueue() took 16.237736225128174 seconds






# This is on a Thinkpad T430, VMWare running Debian 11 VM, and Python 3.7.0


(py37_test) [mpenning@mudslide ~]$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.13469791412353516 seconds
Sending 100000 numbers to Pipe() took 1.5587594509124756 seconds
Sending 1000000 numbers to Pipe() took 14.467186689376831 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.1897726058959961 seconds
Sending 100000 numbers to Queue() took 1.7622203826904297 seconds
Sending 1000000 numbers to Queue() took 16.89015531539917 seconds
(py37_test) [mpenning@mudslide ~]$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.2238149642944336 seconds
Sending 100000 numbers to JoinableQueue() took 1.4744081497192383 seconds
Sending 1000000 numbers to JoinableQueue() took 15.264554023742676 seconds




# This is on a ThinkpadT61 running Ubuntu 11.10, and Python 2.7.2


mpenning@mpenning-T61:~$ python multi_pipe.py
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

总之:

  • 在 python2.7下,Pipe()Queue()快约300% 。除非你真的必须得到好处,否则不要去想 JoinableQueue()
  • 在 python 3.x 下,Pipe()仍然比 Queue()有大约20% 的优势,但是 Pipe()Queue()之间的性能差距不像 python 2.7那样明显。各种 Queue()实现大约相差15% 。我的测试也使用整数数据。一些人评论说,他们发现多处理所使用的数据类型的性能差异。

Python3.x 的底线是: YMMV... 考虑使用您自己的数据类型(即整数/字符串/对象)运行您自己的测试,以便对您自己感兴趣的平台和用例形成结论

我还应该提到,我的 python3.x 性能测试是不一致的,并且有所不同。我在几分钟内运行了多个测试,以获得每个案例的最佳结果。我怀疑这些差异与在 VMWare/虚拟化下运行 python3测试有关; 然而,虚拟化诊断只是推测。

回应 关于测试技术的评论频道

在评论中@JJC :

一个更公平的比较是运行 N 个工作线程,每个工作线程通过点到点管道与主线程通信,而运行 N 个工作线程的性能都是从一个点到多点队列中提取的。

最初,这个答案只考虑一个工作者和一个生产者的性能; 这是 Pipe()的基准用例。您的注释需要为多个辅助进程添加不同的测试。虽然这对于常见的 Queue()用例来说是一个有效的观察,但是它可以很容易地沿着一个全新的轴爆炸测试矩阵(即添加具有不同数量的工作进程的测试)。

额外材料2

多处理在信息流中引入了一些细微的变化,除非您知道一些快捷方式,否则就很难进行调试。例如,在许多情况下,当通过字典进行索引时,您可能拥有一个工作良好的脚本,但是在某些输入中很少出现故障。

通常,当整个 python 进程崩溃时,我们会得到故障的线索; 然而,如果多处理函数崩溃,您不会得到主动打印到控制台的崩溃回溯。如果不知道是什么导致了进程的崩溃,那么跟踪未知的多处理器崩溃就很困难。

我找到的跟踪多处理崩溃信息的最简单方法是将整个多处理函数封装在 try/except中并使用 traceback.print_exc():

import traceback
def run(self, args):
try:
# Insert stuff to be multiprocessed here
return args[0]['that']
except:
print "FATAL: reader({0}) exited while multiprocessing".format(args)
traceback.print_exc()

现在,当你发现一个崩溃,你会看到这样的东西:

FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
File "foo.py", line 19, in __init__
self.run(args)
File "foo.py", line 46, in run
KeyError: 'that'

源代码:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time


def reader_proc(pipe):
## Read from the pipe; this will be spawned as a separate Process
p_output, p_input = pipe
p_input.close()    # We are only reading
while True:
msg = p_output.recv()    # Read from the output pipe and do nothing
if msg=='DONE':
break


def writer(count, p_input):
for ii in range(0, count):
p_input.send(ii)             # Write 'count' numbers into the input pipe
p_input.send('DONE')


if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
# Pipes are unidirectional with two endpoints:  p_input ------> p_output
p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
reader_p = Process(target=reader_proc, args=((p_output, p_input),))
reader_p.daemon = True
reader_p.start()     # Launch the reader process


p_output.close()       # We no longer need this part of the Pipe()
_start = time.time()
writer(count, p_input) # Send a lot of stuff to reader_proc()
p_input.close()
reader_p.join()
print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
(time.time() - _start)))

"""
multi_queue.py
"""


from multiprocessing import Process, Queue
import time
import sys


def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get()         # Read from the queue and do nothing
if (msg == 'DONE'):
break


def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii)             # Write 'count' numbers into the queue
queue.put('DONE')


if __name__=='__main__':
pqueue = Queue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start()        # Launch reader_proc() as a separate python process


_start = time.time()
writer(count, pqueue)    # Send a lot of stuff to reader()
reader_p.join()         # Wait for the reader to finish
print("Sending {0} numbers to Queue() took {1} seconds".format(count,
(time.time() - _start)))

"""
multi_simplequeue.py
"""


from multiprocessing import Process, SimpleQueue
import time
import sys


def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get()         # Read from the queue and do nothing
if (msg == 'DONE'):
break


def writer(count, queue):
## Write to the queue
for ii in range(0, count):
queue.put(ii)             # Write 'count' numbers into the queue
queue.put('DONE')


if __name__=='__main__':
pqueue = SimpleQueue() # writer() writes to pqueue from _this_ process
for count in [10**4, 10**5, 10**6]:
### reader_proc() reads from pqueue as a separate process
reader_p = Process(target=reader_proc, args=((pqueue),))
reader_p.daemon = True
reader_p.start()        # Launch reader_proc() as a separate python process


_start = time.time()
writer(count, pqueue)    # Send a lot of stuff to reader()
reader_p.join()         # Wait for the reader to finish
print("Sending {0} numbers to SimpleQueue() took {1} seconds".format(count,
(time.time() - _start)))

"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time


def reader_proc(queue):
## Read from the queue; this will be spawned as a separate Process
while True:
msg = queue.get()         # Read from the queue and do nothing
queue.task_done()


def writer(count, queue):
for ii in range(0, count):
queue.put(ii)             # Write 'count' numbers into the queue


if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
# reader_proc() reads from jqueue as a different process...
reader_p = Process(target=reader_proc, args=((jqueue),))
reader_p.daemon = True
reader_p.start()     # Launch the reader process
_start = time.time()
writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
jqueue.join()         # Wait for the reader to finish
print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count,
(time.time() - _start)))

值得注意的 Queue()的另一个特性是支线线程。这个节注意到“当一个进程首次将一个项目放入队列时,一个支线线程被启动,该线程将把对象从缓冲区传输到管道中。”可以将无限数量(或最大大小)的项插入到 Queue()中,而不需要调用 queue.put()阻塞。这允许您在 Queue()中存储多个项,直到您的程序准备好处理它们。

另一方面,对于已经发送到一个连接但尚未从另一个连接接收的项,Pipe()具有有限的存储量。这个存储用完后,对 connection.send()的调用将被阻塞,直到有足够的空间来写入整个项。这将停止线程的写操作,直到有其他线程从管道读取。Connection对象允许您访问底层文件描述符。在 * nix 系统上,可以使用 os.set_blocking()函数阻止 connection.send()调用。但是,如果您试图发送一个不适合管道文件的项,这将导致问题。最新版本的 Linux 允许您增加文件的大小,但是允许的最大大小取决于系统配置。因此,您永远不应该依赖于 Pipe()来缓冲数据。对 connection.send的调用可能会阻塞,直到从其他地方的管道读取数据为止。

总之,当您需要缓冲数据时,Queue 比管道是一个更好的选择。即使你只需要在两点之间进行交流。

如果像我一样,你想知道是否使用 multiprocessing结构(PipeQueue)在您的 threading程序的性能,我已经改编了 Mike Pennington的脚本,以比较 queue.Queuequeue.SimpleQueue:

Sending 10000 numbers to mp.Pipe() took 65.051 ms
Sending 10000 numbers to mp.Queue() took 78.977 ms
Sending 10000 numbers to queue.Queue() took 14.781 ms
Sending 10000 numbers to queue.SimpleQueue() took 0.939 ms
Sending 100000 numbers to mp.Pipe() took 449.564 ms
Sending 100000 numbers to mp.Queue() took 811.938 ms
Sending 100000 numbers to queue.Queue() took 149.387 ms
Sending 100000 numbers to queue.SimpleQueue() took 9.264 ms
Sending 1000000 numbers to mp.Pipe() took 4660.451 ms
Sending 1000000 numbers to mp.Queue() took 8499.743 ms
Sending 1000000 numbers to queue.Queue() took 1490.062 ms
Sending 1000000 numbers to queue.SimpleQueue() took 91.238 ms
Sending 10000000 numbers to mp.Pipe() took 45095.935 ms
Sending 10000000 numbers to mp.Queue() took 84829.042 ms
Sending 10000000 numbers to queue.Queue() took 15179.356 ms
Sending 10000000 numbers to queue.SimpleQueue() took 917.562 ms

毫不奇怪,如果所有的线程都是线程,那么使用 queue包会产生更好的结果。 也就是说,我对 queue.SimpleQueue的性能感到惊讶。


"""
pipe_performance.py
"""
import threading as td
import queue
import multiprocessing as mp
import multiprocessing.connection as mp_connection
import time
import typing


def reader_pipe(p_out: mp_connection.Connection) -> None:
while True:
msg = p_out.recv()
if msg=='DONE':
break


def reader_queue(p_queue: queue.Queue[typing.Union[str, int]]) -> None:
while True:
msg = p_queue.get()
if msg=='DONE':
break


if __name__=='__main__':
# first: mp.pipe
for count in [10**4, 10**5, 10**6, 10**7]:
p_mppipe_out, p_mppipe_in = mp.Pipe()
reader_p = td.Thread(target=reader_pipe, args=((p_mppipe_out),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_mppipe_in.send(ii)
p_mppipe_in.send('DONE')
reader_p.join()
print(f"Sending {count} numbers to mp.Pipe() took {(time.time() - _start)*1e3:.3f} ms")


# second: mp.Queue
p_mpqueue  = mp.Queue()
reader_p = td.Thread(target=reader_queue, args=((p_mpqueue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_mpqueue.put(ii)
p_mpqueue.put('DONE')
reader_p.join()
print(f"Sending {count} numbers to mp.Queue() took {(time.time() - _start)*1e3:.3f} ms")


# third: queue.Queue
p_queue = queue.Queue()
reader_p = td.Thread(target=reader_queue, args=((p_queue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_queue.put(ii)
p_queue.put('DONE')
reader_p.join()
print(f"Sending {count} numbers to queue.Queue() took {(time.time() - _start)*1e3:.3f} ms")


# fourth: queue.SimpleQueue
p_squeue = queue.SimpleQueue()
reader_p = td.Thread(target=reader_queue, args=((p_squeue),))
reader_p.start()
_start = time.time()
for ii in range(0, count):
p_squeue.put(ii)
p_squeue.put('DONE')
reader_p.join()
print(f"Sending {count} numbers to queue.SimpleQueue() took {(time.time() - _start)*1e3:.3f} ms")