Multiprocessing:如何使用Pool。映射到类中定义的函数上?

当我运行如下代码时:

from multiprocessing import Pool


p = Pool(5)
def f(x):
return x*x


p.map(f, [1,2,3])

它工作得很好。然而,把它作为一个类的函数:

class calculate(object):
def run(self):
def f(x):
return x*x


p = Pool()
return p.map(f, [1,2,3])


cl = calculate()
print cl.run()

给出如下错误:

Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我曾看过Alex Martelli的一篇文章,他处理了同样的问题,但它不够明确。

162742 次浏览

在类中定义的函数(甚至在类中的函数中)并不真正pickle。然而,这是可行的:

def f(x):
return x*x


class calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])


cl = calculate()
print cl.run()

目前没有解决你的问题,据我所知:你给map()的函数必须可以通过导入你的模块来访问。这就是为什么robert的代码可以工作:函数f()可以通过导入以下代码来获得:

def f(x):
return x*x


class Calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])


if __name__ == '__main__':
cl = Calculate()
print cl.run()

我实际上添加了一个“main”部分,因为它跟在Windows平台的推荐后面(“确保主模块可以被新的Python解释器安全地导入,而不会引起意外的副作用”)。

我还在Calculate前面添加了一个大写字母,以便跟在PEP 8后面。:)

我还对函数池类型的限制感到恼火。Map可以接受。为了避免这种情况,我写了下面的代码。它似乎可以工作,甚至对于parmap的递归使用也是如此。

from multiprocessing import Process, Pipe
from itertools import izip


def spawn(f):
def fun(pipe, x):
pipe.send(f(x))
pipe.close()
return fun


def parmap(f, X):
pipe = [Pipe() for x in X]
proc = [Process(target=spawn(f), args=(c, x)) for x, (p, c) in izip(X, pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p, c) in pipe]


if __name__ == '__main__':
print parmap(lambda x: x**x, range(1, 5))

我也曾在这方面挣扎过。我把函数作为类的数据成员,作为一个简化的例子:

from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# Needed to do something like this (the following line won't work)
return pool.map(self.f,list1,list2)

我需要用到函数self。在同一个类和self中调用Pool.map()。F不接受元组作为参数。由于这个函数嵌入到一个类中,我不清楚如何编写其他答案建议的包装器类型。

我通过使用一个接受元组/列表的不同包装器解决了这个问题,其中第一个元素是函数,其余元素是该函数的参数,称为eval_func_tuple(f_args)。使用此方法,有问题的行可以被返回池替换。映射(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2))。以下是完整的代码:

文件:util.py

def add(a, b): return a+b


def eval_func_tuple(f_args):
"""Takes a tuple of a function and args, evaluates and returns result"""
return f_args[0](*f_args[1:])

文件:main.py

from multiprocessing import Pool
import itertools
import util


pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# The following line will now work
return pool.map(util.eval_func_tuple,
itertools.izip(itertools.repeat(self.f), list1, list2))


if __name__ == '__main__':
myExample = Example(util.add)
list1 = [1, 2, 3]
list2 = [10, 20, 30]
print myExample.add_lists(list1, list2)

运行main.py会得到[11,22,33]。您可以随时对此进行改进,例如,还可以修改eval_func_tuple以接受关键字参数。

另一方面,在另一个答案中,“parmap”函数可以在进程数量多于可用cpu数量的情况下变得更有效。下面是经过编辑的版本。这是我的第一个帖子,我不确定我是否应该直接编辑原来的答案。我还重命名了一些变量。

from multiprocessing import Process, Pipe
from itertools import izip


def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun


def parmap(f,X):
pipe=[Pipe() for x in X]
processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
numProcesses = len(processes)
processNum = 0
outputList = []
while processNum < numProcesses:
endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
for proc in processes[processNum:endProcessNum]:
proc.start()
for proc in processes[processNum:endProcessNum]:
proc.join()
for proc,c in pipe[processNum:endProcessNum]:
outputList.append(proc.recv())
processNum = endProcessNum
return outputList


if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))

mrule的解决方案是正确的,但有一个错误:如果子进程发送回大量数据,它会填充管道的缓冲区,阻塞在子进程的pipe.send()上,而父进程正在等待子进程在pipe.join()上退出。解决方案是在join()ing子对象之前读取子对象的数据。此外,子进程应该关闭父进程的管道末端以防止死锁。下面的代码修复了这个问题。还要注意,这个parmapX中的每个元素创建一个进程。一个更高级的解决方案是使用multiprocessing.cpu_count()X划分为许多块,然后在返回之前合并结果。我把这个问题留给读者做练习,以免破坏这个漂亮答案的简洁性。;)

from multiprocessing import Process, Pipe
from itertools import izip


def spawn(f):
def fun(ppipe, cpipe,x):
ppipe.close()
cpipe.send(f(x))
cpipe.close()
return fun


def parmap(f,X):
pipe=[Pipe() for x in X]
proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
[p.start() for p in proc]
ret = [p.recv() for (p,c) in pipe]
[p.join() for p in proc]
return ret


if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))

我不能使用到目前为止张贴的代码,因为代码使用“多处理”。池“不工作与lambda表达式和代码不使用”多处理。池”生成与工作项一样多的流程。

我调整了代码s.t.,它产生了预定义数量的工人,只有在存在空闲工人的情况下才通过输入列表进行迭代。我还启用了工作人员s.t. ctrl-c的“守护”模式。

import multiprocessing




def fun(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))




def parmap(f, X, nprocs=multiprocessing.cpu_count()):
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()


proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
for _ in range(nprocs)]
for p in proc:
p.daemon = True
p.start()


sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[q_in.put((None, None)) for _ in range(nprocs)]
res = [q_out.get() for _ in range(len(sent))]


[p.join() for p in proc]


return [x for i, x in sorted(res)]




if __name__ == '__main__':
print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))

除非跳出标准库,否则多处理和酸洗是坏的和有限的。

如果你使用名为pathos.multiprocesssingmultiprocessing的分支,你可以直接在multiprocessing的map函数中使用类和类方法。这是因为使用dill而不是picklecPickle,并且dill几乎可以序列化python中的任何东西。

pathos.multiprocessing还提供了一个异步映射函数,它可以使用多个参数map函数(例如map(math.pow, [1,2,3], [4,5,6]))。

< p >看到讨论: 多处理和莳萝一起可以做什么? < / p > < p >: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization < / p >

它甚至可以处理您最初编写的代码,无需修改,并且来自解释器。为什么要做其他更脆弱、更具体的事情?

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
...  def run(self):
...   def f(x):
...    return x*x
...   p = Pool()
...   return p.map(f, [1,2,3])
...
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]

获取代码: https://github.com/uqfoundation/pathos < / p >

为了展示更多的功能:

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
...   return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
...   def plus(self, x, y):
...     return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]

我修改了klaus se的方法,因为虽然它适用于小列表,但当条目的数量为~1000或更多时,它就会挂起。我没有使用None停止条件一次推入一个作业,而是一次加载所有输入队列,并让进程咀嚼它,直到它为空。

from multiprocessing import cpu_count, Queue, Process


def apply_func(f, q_in, q_out):
while not q_in.empty():
i, x = q_in.get()
q_out.put((i, f(x)))


# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
q_in, q_out   = Queue(), Queue()
proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[p.start() for p in proc]
res = [q_out.get() for _ in sent]
[p.join() for p in proc]


return [x for i,x in sorted(res)]

编辑:不幸的是,现在我在我的系统上遇到了这个错误:多处理队列最大大小限制为32767,希望那里的变通方法会有所帮助。

我采用了klaus se和agander3的答案,并制作了一个文档化的模块,它更易于阅读,并保存在一个文件中。你可以把它添加到你的项目中。它甚至有一个可选的进度条!

"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.


Adapted from http://stackoverflow.com/a/16071616/287297


Example usage:


print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)


Comments:


"It spawns a predefined amount of workers and only iterates through the input list
if there exists an idle worker. I also enabled the "daemon" mode for the workers so
that KeyboardInterupt works as expected."


Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.


Alternatively, use this fork of multiprocessing:
https://github.com/uqfoundation/multiprocess
"""


# Modules #
import multiprocessing
from tqdm import tqdm


################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
while not queue_in.empty():
num, obj = queue_in.get()
queue_out.put((num, func_to_apply(obj)))


################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
# Number of processes to use #
if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
# Create queues #
q_in  = multiprocessing.Queue()
q_out = multiprocessing.Queue()
# Process list #
new_proc  = lambda t,a: multiprocessing.Process(target=t, args=a)
processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
# Put all the items (objects) in the queue #
sent = [q_in.put((i, x)) for i, x in enumerate(items)]
# Start them all #
for proc in processes:
proc.daemon = True
proc.start()
# Display progress bar or not #
if verbose:
results = [q_out.get() for x in tqdm(range(len(sent)))]
else:
results = [q_out.get() for x in range(len(sent))]
# Wait for them to finish #
for proc in processes: proc.join()
# Return results #
return [x for i, x in sorted(results)]


################################################################################
def test():
def slow_square(x):
import time
time.sleep(2)
return x**2
objs    = range(20)
squares = prll_map(slow_square, objs, 4, verbose=True)
print "Result: %s" % squares

编辑:增加了@alexander-mcfarlane建议和测试函数

我不确定这种方法是否已经采取,但我正在使用的一个工作是:

from multiprocessing import Pool


t = None


def run(n):
return t.f(n)


class Test(object):
def __init__(self, number):
self.number = number


def f(self, x):
print x * self.number


def pool(self):
pool = Pool(2)
pool.map(run, range(10))


if __name__ == '__main__':
t = Test(9)
t.pool()
pool = Pool(2)
pool.map(run, range(10))

输出应该是:

0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81
class Calculate(object):
# Your instance method to be executed
def f(self, x, y):
return x*y


if __name__ == '__main__':
inp_list = [1,2,3]
y = 2
cal_obj = Calculate()
pool = Pool(2)
results = pool.map(lambda x: cal_obj.f(x, y), inp_list)

您可能希望对类的每个不同实例应用此函数。这也是它的解

class Calculate(object):
# Your instance method to be executed
def __init__(self, x):
self.x = x


def f(self, y):
return self.x*y


if __name__ == '__main__':
inp_list = [Calculate(i) for i in range(3)]
y = 2
pool = Pool(2)
results = pool.map(lambda x: x.f(y), inp_list)

我知道6年前有人问过这个问题,但我只是想加上我的解决方案,因为上面的一些建议看起来非常复杂,但我的解决方案实际上非常简单。

我所要做的就是将pool.map()调用包装到一个helper函数。将类对象和方法的参数作为元组传递,看起来有点像这样。

def run_in_parallel(args):
return args[0].method(args[1])


myclass = MyClass()
method_args = [1,2,3,4,5,6]
args_map = [ (myclass, arg) for arg in method_args ]
pool = Pool()
pool.map(run_in_parallel, args_map)

这是我的解决方案,我认为它比这里的大多数人都不那么粗俗。这与nighttowl的答案相似。

someclasses = [MyClass(), MyClass(), MyClass()]


def method_caller(some_object, some_method='the method'):
return getattr(some_object, some_method)()


othermethod = partial(method_caller, some_method='othermethod')


with Pool(6) as pool:
result = pool.map(othermethod, someclasses)

http://www.rueckstiess.net/research/snippets/show/ca1d7d90http://qingkaikong.blogspot.com/2016/12/python-parallel-method-in-class.html

我们可以创建一个外部函数,并在类self对象中播种它:

from joblib import Parallel, delayed
def unwrap_self(arg, **kwarg):
return square_class.square_int(*arg, **kwarg)


class square_class:
def square_int(self, i):
return i * i


def run(self, num):
results = []
results = Parallel(n_jobs= -1, backend="threading")\
(delayed(unwrap_self)(i) for i in zip([self]*len(num), num))
print(results)

或者没有joblib:

from multiprocessing import Pool
import time


def unwrap_self_f(arg, **kwarg):
return C.f(*arg, **kwarg)


class C:
def f(self, name):
print 'hello %s,'%name
time.sleep(5)
print 'nice to meet you.'


def run(self):
pool = Pool(processes=2)
names = ('frank', 'justin', 'osi', 'thomas')
pool.map(unwrap_self_f, zip([self]*len(names), names))


if __name__ == '__main__':
c = C()
c.run()

我知道这个问题在8年零10个月前就被问过了,但我想向你们展示我的解决方案:

from multiprocessing import Pool


class Test:


def __init__(self):
self.main()


@staticmethod
def methodForMultiprocessing(x):
print(x*x)


def main(self):
if __name__ == "__main__":
p = Pool()
p.map(Test.methodForMultiprocessing, list(range(1, 11)))
p.close()


TestObject = Test()

你只需要把你的类函数变成一个静态方法。但也可以用类方法:

from multiprocessing import Pool


class Test:


def __init__(self):
self.main()


@classmethod
def methodForMultiprocessing(cls, x):
print(x*x)


def main(self):
if __name__ == "__main__":
p = Pool()
p.map(Test.methodForMultiprocessing, list(range(1, 11)))
p.close()


TestObject = Test()

在Python 3.7.3中测试

如果你以某种方式手动忽略类中对象列表中的Pool对象,你可以运行你的代码而没有任何问题,因为它不是错误所说的pickleable。你可以使用__getstate__函数(也可以查看在这里)来实现这一点,如下所示。Pool对象将尝试找到__getstate____setstate__函数,并在运行mapmap_async等时执行它们:

class calculate(object):
def __init__(self):
self.p = Pool()
def __getstate__(self):
self_dict = self.__dict__.copy()
del self_dict['p']
return self_dict
def __setstate__(self, state):
self.__dict__.update(state)


def f(self, x):
return x*x
def run(self):
return self.p.map(self.f, [1,2,3])

然后做:

cl = calculate()
cl.run()

将给你输出:

[1, 4, 9]

我已经在Python 3中测试了上面的代码。X可行。

这可能不是一个很好的解,但在我的例子中,我是这样解的。

from multiprocessing import Pool


def foo1(data):
self = data.get('slf')
lst = data.get('lst')
return sum(lst) + self.foo2()


class Foo(object):
def __init__(self, a, b):
self.a = a
self.b = b


def foo2(self):
return self.a**self.b


def foo(self):
p = Pool(5)
lst = [1, 2, 3]
result = p.map(foo1, (dict(slf=self, lst=lst),))
return result


if __name__ == '__main__':
print(Foo(2, 4).foo())

我必须将self传递给我的函数,因为我必须通过该函数访问类的属性和函数。这对我很有用。欢迎指正和建议。

下面是我编写的在python3中使用多处理池的样板,特别是使用python3.7.7来运行测试。我得到了我最快的运行使用imap_unordered。只需插入您的场景并尝试一下。你可以使用timeittime.time()来找出最适合你的方法。

import multiprocessing
import time


NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
MP_FUNCTION = 'starmap'  # 'imap_unordered' or 'starmap' or 'apply_async'


def process_chunk(a_chunk):
print(f"processig mp chunk {a_chunk}")
return a_chunk




map_jobs = [1, 2, 3, 4]


result_sum = 0


s = time.time()
if MP_FUNCTION == 'imap_unordered':
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
for i in pool.imap_unordered(process_chunk, map_jobs):
result_sum += i
elif MP_FUNCTION == 'starmap':
pool = multiprocessing.Pool(processes=NUMBER_OF_PROCESSES)
try:
map_jobs = [(i, ) for i in map_jobs]
result_sum = pool.starmap(process_chunk, map_jobs)
result_sum = sum(result_sum)
finally:
pool.close()
pool.join()
elif MP_FUNCTION == 'apply_async':
with multiprocessing.Pool(processes=NUMBER_OF_PROCESSES) as pool:
result_sum = [pool.apply_async(process_chunk, [i, ]).get() for i in map_jobs]
result_sum = sum(result_sum)
print(f"result_sum is {result_sum}, took {time.time() - s}s")

在上面的场景中,imap_unordered实际上对我来说表现最差。尝试您的案例,并在您计划运行它的机器上进行基准测试。还要阅读进程池。干杯!