在 Python 中向 pool.map()函数传递多个参数

我需要一些方法来使用 pool.map ()中接受多个参数的函数。根据我的理解,pool.map ()的目标函数只能有一个可迭代的参数,但是有没有一种方法可以传递其他参数呢?在这种情况下,我需要向目标函数传入一些配置变量,比如 Lock ()和日志信息。

我已经尝试做了一些研究,我认为我可以使用部分函数来使它工作?然而,我不完全理解这些工作原理。任何帮助都将不胜感激!下面是我想做的一个简单的例子:

def target(items, lock):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()


def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
pool.map(target(PASS PARAMS HERE), iterable)
pool.close()
pool.join()
156798 次浏览

您可以使用 functools.partial(正如您所怀疑的) :

from functools import partial


def target(lock, iterable_item):
for item in iterable_item:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()


def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
l = multiprocessing.Lock()
func = partial(target, l)
pool.map(func, iterable)
pool.close()
pool.join()

例如:

def f(a, b, c):
print("{} {} {}".format(a, b, c))


def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
a = "hi"
b = "there"
func = partial(f, a, b)
pool.map(func, iterable)
pool.close()
pool.join()


if __name__ == "__main__":
main()

产出:

hi there 1
hi there 2
hi there 3
hi there 4
hi there 5

如果你不能访问 functools.partial,你也可以使用包装函式。

def target(lock):
def wrapped_func(items):
for item in items:
# Do cool stuff
if (... some condition here ...):
lock.acquire()
# Write to stdout or logfile, etc.
lock.release()
return wrapped_func


def main():
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
lck = multiprocessing.Lock()
pool.map(target(lck), iterable)
pool.close()
pool.join()

这使得 target()成为一个接受锁(或者您想要给出的任何参数)的函数,并且它将返回一个只接受迭代的函数作为输入,但仍然可以使用所有其他参数。这就是最终传递给 pool.map()的内容,然后应该可以毫无问题地执行。

您可以使用允许多个参数的 map 函数,就像 pathos中的 multiprocessing分叉一样。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> def add_and_subtract(x,y):
...   return x+y, x-y
...
>>> res = Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))
>>> res
[(-5, 5), (-2, 6), (1, 7), (4, 8), (7, 9), (10, 10), (13, 11), (16, 12), (19, 13), (22, 14)]
>>> Pool().map(add_and_subtract, *zip(*res))
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

pathos使您能够轻松地用多个输入嵌套分层并行映射,因此我们可以扩展我们的示例来演示这一点。

>>> from pathos.multiprocessing import ThreadingPool as TPool
>>>
>>> res = TPool().amap(add_and_subtract, *zip(*Pool().map(add_and_subtract, range(0,20,2), range(-5,5,1))))
>>> res.get()
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

更有趣的是,构建一个可以传递到 Pool 中的嵌套函数。 这是可能的,因为 pathos使用 dill,它可以序列化 python 中的几乎所有内容。

>>> def build_fun_things(f, g):
...   def do_fun_things(x, y):
...     return f(x,y), g(x,y)
...   return do_fun_things
...
>>> def add(x,y):
...   return x+y
...
>>> def sub(x,y):
...   return x-y
...
>>> neato = build_fun_things(add, sub)
>>>
>>> res = TPool().imap(neato, *zip(*Pool().map(neato, range(0,20,2), range(-5,5,1))))
>>> list(res)
[(0, -10), (4, -8), (8, -6), (12, -4), (16, -2), (20, 0), (24, 2), (28, 4), (32, 6), (36, 8)]

但是,如果您不能走出标准库,那么您将不得不使用另一种方法。在这种情况下,您最好的选择是使用 multiprocessing.starmap,如下所示: 用于多个参数的 Python 多处理 pool.map(@Roberto 在 OP 文章的评论中提到)

这里是 pathos: https://github.com/uqfoundation