多处理。Pool: map_async和imap之间的区别是什么?

我正在学习如何使用Python的multiprocessing包,但我不明白map_asyncimap之间的区别。 我注意到map_asyncimap都是异步执行的。那么什么时候我应该用一种而不是另一种呢?我应该如何检索map_async返回的结果?< / p >

我应该用这样的东西吗?

def test():
result = pool.map_async()
pool.close()
pool.join()
return result.get()


result=test()
for i in result:
print i
127937 次浏览

imap/imap_unorderedmap/map_async之间有两个关键区别:

  1. 它们消耗你传递给它们的可迭代对象的方式。
  2. 他们把结果返回给你的方式。

map通过将可迭代对象转换为列表(假设它还不是列表)来消耗可迭代对象,将其分解为块,并将这些块发送给Pool中的工作进程。将可迭代对象分解成块比在进程之间一次传递一个可迭代对象中的每个项执行得更好——特别是如果可迭代对象很大的话。然而,将可迭代对象转换为一个列表以将其分成块可能会有非常高的内存成本,因为整个列表将需要保存在内存中。

imap不会把你给它的可迭代对象转换成一个列表,也不会把它分解成块(默认情况下)。它将每次迭代可迭代的一个元素,并将它们每个发送到工作进程。这意味着将整个可迭代对象转换为列表不会占用内存,但这也意味着大型可迭代对象的性能较慢,因为缺少分块。但是,可以通过传递一个大于默认值1的chunksize参数来缓解这一问题。

imap/imap_unorderedmap/map_async之间的另一个主要区别是,使用imap/imap_unordered,您可以在工作人员准备好后立即开始接收结果,而不必等待所有工作人员完成。对于map_asyncAsyncResult会立即返回,但在所有结果都被处理完之前,你无法从该对象中检索结果,此时它会返回与map相同的列表(map实际上在内部实现为imap_unordered0)。不可能得到部分结果;你要么得到全部结果,要么什么都没有。

imapimap_unordered都立即返回可迭代对象。使用imap,结果将在迭代对象准备好后立即从迭代对象中产生,同时仍然保留输入迭代对象的顺序。使用imap_unordered,结果一准备好就会产生,不管输入可迭代对象的顺序如何。假设你有这个:

import multiprocessing
import time


def func(x):
time.sleep(x)
return x + 2


if __name__ == "__main__":
p = multiprocessing.Pool()
start = time.time()
for x in p.imap(func, [1,5,3]):
print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

这将输出:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

如果你使用p.imap_unordered而不是p.imap,你会看到:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

如果你使用p.mapp.map_async().get(),你会看到:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

因此,使用imap/imap_unordered而不是map_async的主要原因是:

  1. 你的iterable足够大,以至于将它转换成一个列表会导致你耗尽/使用太多的内存。
  2. 你希望能够在所有完成之前开始处理结果。

公认的答案是,对于imap_unordered,“结果将在他们准备好后立即产生”。人们可能会推断结果将按照完成的顺序返回。但我只是想说清楚,这不是真的在一般情况下。文档声明结果以任意的顺序返回。考虑下面的程序,它使用池大小为4,可迭代的大小为20,chunksize值为5。worker函数的休眠时间是可变的,这取决于它传递的参数,这也确保池中没有一个进程获取所有提交的任务。因此,我期望池中的每个进程都有20 / 4 = 5任务要处理:

from multiprocessing import Pool
import time


def worker(x):
print(f'x = {x}', flush=True)
time.sleep(.1 * (20 - x))
# return approximate completion time with passed argument:
return time.time(), x


if __name__ == '__main__':
pool = Pool(4)
results = pool.imap_unordered(worker, range(20), chunksize=5)
for t, x in results:
print('result:', t, x)

打印:

x = 0
x = 5
x = 10
x = 15
x = 16
x = 17
x = 11
x = 18
x = 19
x = 6
result: 1621512513.7737606 15
result: 1621512514.1747007 16
result: 1621512514.4758775 17
result: 1621512514.675989 18
result: 1621512514.7766125 19
x = 12
x = 1
x = 13
x = 7
x = 14
x = 2
result: 1621512514.2716103 10
result: 1621512515.1721854 11
result: 1621512515.9727488 12
result: 1621512516.6744206 13
result: 1621512517.276999 14
x = 8
x = 9
x = 3
result: 1621512514.7695887 5
result: 1621512516.170747 6
result: 1621512517.4713914 7
result: 1621512518.6734042 8
result: 1621512519.7743165 9
x = 4
result: 1621512515.268784 0
result: 1621512517.1698637 1
result: 1621512518.9698756 2
result: 1621512520.671273 3
result: 1621512522.2716706 4

您可以清楚地看到,这些结果不是按完成顺序产生的。例如,我已经返回了1621512519.7743165 9后面跟着1621512515.268784 0,它是由工作者函数比之前返回的结果早4秒多返回的。然而,如果我将chunksize值改为1,打印输出将变成:

x = 0
x = 1
x = 2
x = 3
x = 4
result: 1621513028.888357 3
x = 5
result: 1621513028.9863524 2
x = 6
result: 1621513029.0838938 1
x = 7
result: 1621513029.1825204 0
x = 8
result: 1621513030.4842813 7
x = 9
result: 1621513030.4852195 6
x = 10
result: 1621513030.4872172 5
x = 11
result: 1621513030.4892178 4
x = 12
result: 1621513031.3908074 11
x = 13
result: 1621513031.4895358 10
x = 14
result: 1621513031.587289 9
x = 15
result: 1621513031.686152 8
x = 16
result: 1621513032.1877549 15
x = 17
result: 1621513032.1896958 14
x = 18
result: 1621513032.1923752 13
x = 19
result: 1621513032.1923752 12
result: 1621513032.2935638 19
result: 1621513032.3927407 18
result: 1621513032.4912949 17
result: 1621513032.5884912 16

这个完成顺序。然而,我很犹豫地声明,imap_unordered 将永远返回结果,因为它们变得可用如果一个chunksize值1被指定,尽管这似乎是基于这个实验的情况,因为文档没有这样的声明。

讨论

当指定chunksize为5时,这20个任务被放置在一个输入队列中,以便池中的4个进程以大小为5的块进行处理。因此,空闲的进程将从队列中取出5个任务的下一个块,并在再次空闲之前依次处理每个任务。因此,第一个进程将处理x参数0到4,第二个进程将处理x参数5到9,等等。这就是为什么你会看到x的初始值被打印为0,5,10和15。

但是,虽然x参数0的结果在x参数9的结果之前完成,但结果似乎会作为块一起写入,因此x参数0的结果将不会返回,直到在同一块中排队的x参数的结果(即1,2,3和4)也可用。