如何使用多重处理pool.map多个参数

在Pythonmultiprocessing库中,是否有pool.map的变体支持多个参数?

import multiprocessing
text = "test"
def harvester(text, case):X = case[0]text + str(X)
if __name__ == '__main__':pool = multiprocessing.Pool(processes=6)case = RAW_DATASETpool.map(harvester(text, case), case, 1)pool.close()pool.join()
909466 次浏览

这个问题的答案取决于版本和情况。最近版本的Python(自3.3以来)的最通用答案首先由J. F.塞巴斯蒂安描述。1它使用Pool.starmap方法,它接受一系列参数元组。然后它自动从每个元组中解包参数并将它们传递给给定的函数:

import multiprocessingfrom itertools import product
def merge_names(a, b):return '{} & {}'.format(a, b)
if __name__ == '__main__':names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']with multiprocessing.Pool(processes=3) as pool:results = pool.starmap(merge_names, product(names, repeat=2))print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

对于早期版本的Python,你需要编写一个helper函数来显式解压缩参数。如果你想使用with,你还需要编写一个包装器来将Pool变成上下文管理器。(感谢μ介子指出这一点。)

import multiprocessingfrom itertools import productfrom contextlib import contextmanager
def merge_names(a, b):return '{} & {}'.format(a, b)
def merge_names_unpack(args):return merge_names(*args)
@contextmanagerdef poolcontext(*args, **kwargs):pool = multiprocessing.Pool(*args, **kwargs)yield poolpool.terminate()
if __name__ == '__main__':names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']with poolcontext(processes=3) as pool:results = pool.map(merge_names_unpack, product(names, repeat=2))print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

在简单的情况下,使用固定的第二个参数,您也可以使用partial,但仅限于Python 2.7+。

import multiprocessingfrom functools import partialfrom contextlib import contextmanager
@contextmanagerdef poolcontext(*args, **kwargs):pool = multiprocessing.Pool(*args, **kwargs)yield poolpool.terminate()
def merge_names(a, b):return '{} & {}'.format(a, b)
if __name__ == '__main__':names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']with poolcontext(processes=3) as pool:results = pool.map(partial(merge_names, b='Sons'), names)print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1.这在很大程度上是受他的回答的启发,他的回答本应该被接受。但由于这篇文章卡在了顶部,似乎最好为未来的读者改进一下。

是否有支持多个参数的pool.map变体?

Python 3.3包括#0方法

#!/usr/bin/env python3from functools import partialfrom itertools import repeatfrom multiprocessing import Pool, freeze_support
def func(a, b):return a + b
def main():a_args = [1,2,3]second_arg = 1with Pool() as pool:L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])M = pool.starmap(func, zip(a_args, repeat(second_arg)))N = pool.map(partial(func, b=second_arg), a_args)assert L == M == N
if __name__=="__main__":freeze_support()main()

对于旧版本:

#!/usr/bin/env python2import itertoolsfrom multiprocessing import Pool, freeze_support
def func(a, b):print a, b
def func_star(a_b):"""Convert `f([1,2])` to `f(1,2)` call."""return func(*a_b)
def main():pool = Pool()a_args = [1,2,3]second_arg = 1pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":freeze_support()main()

产出

1 12 13 1

注意这里如何使用itertools.izip()itertools.repeat()

由于@unutbu提到的bug,您不能在Python 2.6上使用functools.partial()或类似的功能,因此应该显式定义简单的包装器函数func_star()。另请参阅的变通方法建议#2

我认为下面会更好:

def multi_run_wrapper(args):return add(*args)
def add(x,y):return x+y
if __name__ == "__main__":from multiprocessing import Poolpool = Pool(4)results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])print results

产出

[3, 5, 7]

有一个名为悲情注意:使用GitHub上的版本)的multiprocessing分支不需要starmap——map函数反映了Python map的API,因此map可以接受多个参数。

使用pathos,您通常还可以在解释器中进行多重处理,而不是被困在__main__块中。Pathos将在一些温和的更新之后发布-主要是转换为Python 3. x。

  Python 2.7.5 (default, Sep 30 2013, 20:15:49)[GCC 4.2.1 (Apple Inc. build 5566)] on darwinType "help", "copyright", "credits" or "license" for more information.>>> def func(a,b):...     print a,b...>>>>>> from pathos.multiprocessing import ProcessingPool>>> pool = ProcessingPool(nodes=4)>>> pool.map(func, [1,2,3], [1,1,1])1 12 13 1[None, None, None]>>>>>> # also can pickle stuff like lambdas>>> result = pool.map(lambda x: x**2, range(10))>>> result[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]>>>>>> # also does asynchronous map>>> result = pool.amap(pow, [1,2,3], [4,5,6])>>> result.get()[1, 32, 729]>>>>>> # or can return a map iterator>>> result = pool.imap(pow, [1,2,3], [4,5,6])>>> result<processing.pool.IMapIterator object at 0x110c2ffd0>>>> list(result)[1, 32, 729]

pathos有几种方法可以获得starmap的确切行为。

>>> def add(*x):...   return sum(x)...>>> x = [[1,2,3],[4,5,6]]>>> import pathos>>> import numpy as np>>> # use ProcessPool's map and transposing the inputs>>> pp = pathos.pools.ProcessPool()>>> pp.map(add, *np.array(x).T)[6, 15]>>> # use ProcessPool's map and a lambda to apply the star>>> pp.map(lambda x: add(*x), x)[6, 15]>>> # use a _ProcessPool, which has starmap>>> _pp = pathos.pools._ProcessPool()>>> _pp.starmap(add, x)[6, 15]>>>

在学习了J. F. Sebastian的回答中的迭代工具之后,我决定更进一步,编写一个parmap包来处理并行化,在Python 2.7和Python 3.2(以及更高版本)中提供mapstarmap函数,可以采用任何数字的位置参数。

安装

pip install parmap

如何并行化:

import parmap# If you want to do:y = [myfunction(x, argument1, argument2) for x in mylist]# In parallel:y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]# In parallel:z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:listx = [1, 2, 3, 4, 5, 6]listy = [2, 3, 4, 5, 6, 7]param = 3.14param2 = 42listz = []for (x, y) in zip(listx, listy):listz.append(myfunction(x, y, param1, param2))# In parallel:listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

我已经将parmap上传到PyPI和一个github存储库

作为一个例子,这个问题可以回答如下:

import parmap
def harvester(case, text):X = case[0]text+ str(X)
if __name__ == "__main__":case = RAW_DATASET  # assuming this is an iterableparmap.map(harvester, case, "test", chunksize=1)

另一种方法是将列表列表传递给单参数例程:

import osfrom multiprocessing import Pool
def task(args):print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [[1,2],[3,4],[5,6],[7,8]])

然后可以用自己喜欢的方法构造一个参数列表。

您可以使用以下两个函数,以避免为每个新函数编写包装器:

import itertoolsfrom multiprocessing import Pool
def universal_worker(input_pair):function, args = input_pairreturn function(*args)
def pool_args(function, *args):return zip(itertools.repeat(function), zip(*args))

将函数function与参数arg_0arg_1arg_2的列表一起使用,如下所示:

pool = Pool(n_core)list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)pool.close()pool.join()

使用python3.3+pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):print(i, "---", x)
a = ["1","2","3"]b = ["4","5","6"]
pool = ThreadPool(2)pool.starmap(write, zip(a,b))pool.close()pool.join()

结果:

1 --- 42 --- 53 --- 6

如果你愿意,你也可以zip()更多参数:zip(a,b,c,d,e)

如果您想将恒定值作为参数传递:

import itertools
zip(itertools.repeat(constant), a)

如果你的函数应该返回什么:

results = pool.starmap(write, zip(a,b))

这会给出一个包含返回值的List。

从Python 3.4.4开始,您可以使用multiprocessing.get_context()来获取上下文对象以使用多个start方法:

import multiprocessing as mp
def foo(q, h, w):q.put(h + ' ' + w)print(h + ' ' + w)
if __name__ == '__main__':ctx = mp.get_context('spawn')q = ctx.Queue()p = ctx.Process(target=foo, args=(q,'hello', 'world'))p.start()print(q.get())p.join()

或者你只是简单地替换

pool.map(harvester(text, case), case, 1)

有:

pool.apply_async(harvester(text, case), case, 1)

一个更好的方法是使用装饰者而不是手动编写包装函数。特别是当你有很多函数要映射时,装饰器将通过避免为每个函数编写包装器来节省你的时间。通常装饰函数是不可复制的,但我们可以使用functools来解决它。更多讨论可以找到这里

下面是一个例子:

def unpack_args(func):from functools import wraps@wraps(func)def wrapper(args):if isinstance(args, dict):return func(**args)else:return func(*args)return wrapper
@unpack_argsdef func(x, y):return x + y

然后你可以用压缩参数映射它:

np, xlist, ylist = 2, range(10), range(10)pool = Pool(np)res = pool.map(func, zip(xlist, ylist))pool.close()pool.join()

当然,您可以始终使用Python 3中的Pool.starmap(>=3.3),如其他答案所述。

另一种简单的替代方法是将函数参数包装在元组中,然后将应该在元组中传递的参数也包装起来。在处理大量数据时,这可能并不理想。我相信它会为每个元组复制。

from multiprocessing import Pool
def f((a,b,c,d)):print a,b,c,dreturn a + b + c +d
if __name__ == '__main__':p = Pool(10)data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]print(p.map(f, data))p.close()p.join()

以随机顺序给出输出:

0 1 2 31 2 3 42 3 4 53 4 5 64 5 6 75 6 7 87 8 9 106 7 8 98 9 10 119 10 11 12[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

在官方留档中,它只支持一个可迭代参数。我喜欢在这种情况下使用apply_async。在你的情况下,我会这样做:

from multiprocessing import Process, Pool, Manager
text = "test"def harvester(text, case, q = None):X = case[0]res = text+ str(X)if q:q.put(res)return res

def block_until(q, results_queue, until_counter=0):i = 0while i < until_counter:results_queue.put(q.get())i+=1
if __name__ == '__main__':pool = multiprocessing.Pool(processes=6)case = RAW_DATASETm = Manager()q = m.Queue()results_queue = m.Queue() # when it completes results will reside in this queueblocking_process = Process(block_until, (q, results_queue, len(case)))blocking_process.start()for c in case:try:res = pool.apply_async(harvester, (text, case, q = None))res.get(timeout=0.1)except:passblocking_process.join()

更好的Python 2解决方案:

from multiprocessing import Pooldef func((i, (a, b))):print i, a, breturn a + bpool = Pool(3)pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

产出

2 3 4
1 2 3
0 1 2
out[]:
[3, 5, 7]

如何获取多个参数:

def f1(args):a, b, c = args[0] , args[1] , args[2]return a+b+c
if __name__ == "__main__":import multiprocessingpool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])print(result1)

对于Python 2,您可以使用此技巧

def fun(a, b):return a + b
pool = multiprocessing.Pool(processes=6)b = 233pool.map(lambda x:fun(x, b), range(1000))
text = "test"
def unpack(args):return args[0](*args[1:])
def harvester(text, case):X = case[0]text+ str(X)
if __name__ == '__main__':pool = multiprocessing.Pool(processes=6)case = RAW_DATASET# args is a list of tuples# with the function to execute as the first item in each tupleargs = [(harvester, text, c) for c in case]# doing it this way, we can pass any function# and we don't need to define a wrapper for each different function# if we need to use more than onepool.map(unpack, args)pool.close()pool.join()

这是我用来将多个参数传递给pool.imap分叉中使用的单参数函数的例程示例:

from multiprocessing import Pool
# Wrapper of the function to map:class makefun:def __init__(self, var2):self.var2 = var2def fun(self, i):var2 = self.var2return var1[i] + var2
# Couple of variables for the example:var1 = [1, 2, 3, 5, 6, 7, 8]var2 = [9, 10, 11, 12]
# Open the pool:pool = Pool(processes=2)
# Wrapper loopfor j in range(len(var2)):# Obtain the function to mappool_fun = makefun(var2[j]).fun
# Fork loopfor i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):print(var1[i], '+' ,var2[j], '=', value)
# Close the poolpool.close()

这里有很多答案,但似乎没有一个提供可以在任何版本上运行的Python 2/3兼容代码。如果您希望您的代码为只是工作,这将适用于任何Python版本:

# For python 2/3 compatibility, define pool context manager# to support the 'with' statement in Python 2if sys.version_info[0] == 2:from contextlib import contextmanager@contextmanagerdef multiprocessing_context(*args, **kwargs):pool = multiprocessing.Pool(*args, **kwargs)yield poolpool.terminate()else:multiprocessing_context = multiprocessing.Pool

之后,您可以使用常规Python 3的多重处理方式,只要您喜欢。例如:

def _function_to_run_for_each(x):return x.lower()with multiprocessing_context(processes=3) as pool:results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

将在Python 2或Python 3中工作。

这是另一种方法,IMHO比提供的任何其他答案都更简单和优雅。

这个程序有一个函数,它接受两个参数,打印出来并打印总和:

import multiprocessing
def main():
with multiprocessing.Pool(10) as pool:params = [ (2, 2), (3, 3), (4, 4) ]pool.starmap(printSum, params)# end with
# end function
def printSum(num1, num2):mySum = num1 + num2print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))# end function
if __name__ == '__main__':main()

输出为:

num1 = 2, num2 = 2, sum = 4num1 = 3, num2 = 3, sum = 6num1 = 4, num2 = 4, sum = 8

有关更多信息,请参阅python文档:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

特别是一定要检查starmap函数。

我正在使用Python 3.6,我不确定这是否适用于较旧的Python版本

为什么在文档中没有这样一个非常直接的例子,我不确定。

这可能是另一种选择。诀窍在wrapper函数中,它返回另一个传递给pool.map的函数。下面的代码读取一个输入数组,对于其中的每个(唯一)元素,返回该元素在数组中出现的次数(即计数),例如,如果输入是

np.eye(3) = [ [1. 0. 0.][0. 1. 0.][0. 0. 1.]]

那么零出现6次,一个出现3次

import numpy as npfrom multiprocessing.dummy import Pool as ThreadPoolfrom multiprocessing import cpu_count

def extract_counts(label_array):labels = np.unique(label_array)out = extract_counts_helper([label_array], labels)return out
def extract_counts_helper(args, labels):n = max(1, cpu_count() - 1)pool = ThreadPool(n)results = {}pool.map(wrapper(args, results), labels)pool.close()pool.join()return results
def wrapper(argsin, results):def inner_fun(label):label_array = argsin[0]counts = get_label_counts(label_array, label)results[label] = countsreturn inner_fun
def get_label_counts(label_array, label):return sum(label_array.flatten() == label)
if __name__ == "__main__":img = np.ones([2,2])out = extract_counts(img)print('input array: \n', img)print('label counts: ', out)print("========")           
img = np.eye(3)out = extract_counts(img)print('input array: \n', img)print('label counts: ', out)print("========")    
img = np.random.randint(5, size=(3, 3))out = extract_counts(img)print('input array: \n', img)print('label counts: ', out)print("========")

你应该得到:

input array:[[1. 1.][1. 1.]]label counts:  {1.0: 4}========input array:[[1. 0. 0.][0. 1. 0.][0. 0. 1.]]label counts:  {0.0: 6, 1.0: 3}========input array:[[4 4 0][2 4 3][2 3 1]]label counts:  {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}========

将所有参数存储为元组数组

该示例说,通常您将函数调用为:

def mainImage(fragCoord: vec2, iResolution: vec3, iTime: float) -> vec3:

而是传递一个元组并解压参数:

def mainImage(package_iter) -> vec3:fragCoord = package_iter[0]iResolution = package_iter[1]iTime = package_iter[2]

通过事先使用循环构建元组:

package_iter = []iResolution = vec3(nx, ny, 1)for j in range((ny-1), -1, -1):for i in range(0, nx, 1):fragCoord: vec2 = vec2(i, j)time_elapsed_seconds = 10package_iter.append((fragCoord, iResolution, time_elapsed_seconds))

然后通过传递元组数组来执行所有使用map:

array_rgb_values = []
with concurrent.futures.ProcessPoolExecutor() as executor:for val in executor.map(mainImage, package_iter):fragColor = valir = clip(int(255* fragColor.r), 0, 255)ig = clip(int(255* fragColor.g), 0, 255)ib = clip(int(255* fragColor.b), 0, 255)
array_rgb_values.append((ir, ig, ib))

我知道Python有***用于解包,但我还没有尝试过。

使用更高级别的库并发期货也比使用低级别的多重处理库更好。

import timefrom multiprocessing import Pool

def f1(args):vfirst, vsecond, vthird = args[0] , args[1] , args[2]print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')pass

if __name__ == '__main__':p = Pool()result = p.map(f1, [['Dog','Cat','Mouse']])p.close()p.join()print(result)

对我来说,下面是一个简短而简单的解决方案:

from multiprocessing.pool import ThreadPoolfrom functools import partialfrom time import sleepfrom random import randint
def dosomething(var,s):sleep(randint(1,5))print(var)return var + s
array = ["a", "b", "c", "d", "e"]with ThreadPool(processes=5) as pool:resp_ = pool.map(partial(dosomething,s="2"), array)print(resp_)

输出:

abdec['a2', 'b2', 'c2', 'd2', 'e2']