Python multiprocessing PicklingError:不能pickle '类型'函数'>

很抱歉,我不能用一个更简单的例子重现这个错误,而且我的代码太复杂了,无法发布。如果我在IPython shell而不是常规Python中运行程序,事情会很顺利。

我查阅了以前关于这个问题的一些笔记。它们都是由使用pool调用类函数中定义的函数引起的。但对我来说不是这样。

Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我很感激你的帮助。

更新:我pickle的函数定义在模块的顶层。尽管它调用了一个包含嵌套函数的函数。例如,f()调用g()调用具有嵌套函数i()h(),而我正在调用pool.apply_async(f)f()g()h()都是在顶层定义的。我用这个模式尝试了一个更简单的例子,尽管它是有效的。

393150 次浏览

这是一个可以pickle的内容列表。特别是,函数只有在模块的顶层定义时才可pickle。

这段代码:

import multiprocessing as mp


class Foo():
@staticmethod
def work(self):
pass


if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()

产生一个错误几乎相同的一个你张贴:

Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题是pool方法都使用mp.SimpleQueue将任务传递给工作进程。经过mp.SimpleQueue的所有内容都必须是可选的,而foo.work是不可选的,因为它没有在模块的顶层定义。

可以通过在顶层定义一个函数来修复,该函数调用foo.work():

def work(foo):
foo.work()


pool.apply_async(work,args=(foo,))

注意,foo是可选的,因为Foo是在顶层定义的,而foo.__dict__是可选的。

我发现,通过尝试对代码段使用分析器,我还可以在一个完美工作的代码段上生成完全相同的错误输出。

注意,这是在Windows上(其中的分叉有点不优雅)。

我在跑:

python -m profile -o output.pstats <script>

发现删除剖析可以消除错误,而放置剖析可以恢复错误。也快把我逼疯了因为我知道密码以前有用。我正在检查是否有什么东西更新了pool.py…然后有了一种沉沦的感觉,消除了侧写,就是这样。

把它贴在这里存档,以防别人发现。

我会使用pathos.multiprocesssing,而不是multiprocessingpathos.multiprocessingmultiprocessing的一个分支,它使用了dilldill几乎可以序列化python中的任何东西,所以你可以并行地发送更多的东西。pathos分支也有直接处理多个参数函数的能力,就像你需要类方法那样。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y):
...     return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

在这里获取pathos(如果你喜欢的话,dill): https://github.com/uqfoundation < / p >

正如其他人所说,multiprocessing只能将Python对象传递给可以pickle的工作进程。如果你不能像unutbu所描述的那样重新组织你的代码,你可以使用__abc1扩展的pickle /unpickling功能来传输数据(特别是代码数据),如下所示。

这个解决方案只需要安装dill,而不需要像pathos这样的其他库:

import os
from multiprocessing import Pool


import dill




def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)




def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))




if __name__ == "__main__":


pool = Pool(processes=5)


# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)


for job in jobs:
print job.get()
print


# async execution of static method


class O(object):


@staticmethod
def calc():
return os.getpid()


jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)


for job in jobs:
print job.get()

该解决方案只需要安装dill,而不需要像pathos那样安装其他库

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
"""
Unpack dumped function as target function and call it with arguments.


:param (dumped_function, item, args, kwargs):
a tuple of dumped function and its arguments
:return:
result of target function
"""
target_function = dill.loads(dumped_function)
res = target_function(item, *args, **kwargs)
return res




def pack_function_for_map(target_function, items, *args, **kwargs):
"""
Pack function and arguments to object that can be sent from one
multiprocessing.Process to another. The main problem is:
«multiprocessing.Pool.map*» or «apply*»
cannot use class methods or closures.
It solves this problem with «dill».
It works with target function as argument, dumps it («with dill»)
and returns dumped function with arguments of target function.
For more performance we dump only target function itself
and don't dump its arguments.
How to use (pseudo-code):


~>>> import multiprocessing
~>>> images = [...]
~>>> pool = multiprocessing.Pool(100500)
~>>> features = pool.map(
~...     *pack_function_for_map(
~...         super(Extractor, self).extract_features,
~...         images,
~...         type='png'
~...         **options,
~...     )
~... )
~>>>


:param target_function:
function, that you want to execute like  target_function(item, *args, **kwargs).
:param items:
list of items for map
:param args:
positional arguments for target_function(item, *args, **kwargs)
:param kwargs:
named arguments for target_function(item, *args, **kwargs)
:return: tuple(function_wrapper, dumped_items)
It returs a tuple with
* function wrapper, that unpack and call target function;
* list of packed target function and its' arguments.
"""
dumped_function = dill.dumps(target_function)
dumped_items = [(dumped_function, item, args, kwargs) for item in items]
return apply_packed_function_for_map, dumped_items

它也适用于numpy数组。

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如果传递给异步作业的模型对象中有任何内置函数,也会出现此错误。

因此,请确保检查传递的模型对象没有内置函数。(在我们的例子中,我们在模型中使用django-model-utilsFieldTracker()函数来跟踪某个字段)。这里是链接到相关的GitHub问题。

建立在@rocksportrocker解决方案上, 在发送和RECVing结果时使用dill是有意义的

import dill
import itertools
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
res = fun(*args)
res = dill.dumps(res)
return res


def dill_map_async(pool, fun, args_list,
as_tuple=True,
**kw):
if as_tuple:
args_list = ((x,) for x in args_list)


it = itertools.izip(
itertools.cycle([fun]),
args_list)
it = itertools.imap(dill.dumps, it)
return pool.map_async(run_dill_encoded, it, **kw)


if __name__ == '__main__':
import multiprocessing as mp
import sys,os
p = mp.Pool(4)
res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
[lambda x:x+1]*10,)
res = res.get(timeout=100)
res = map(dill.loads,res)
print(res)

当这个问题出现在multiprocessing时,一个简单的解决方案是从Pool切换到ThreadPool。除了import-,无需更改其他代码即可完成此操作

from multiprocessing.pool import ThreadPool as Pool

这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要pickle。

这种方法的缺点是python不是处理线程最好的语言——它使用一种叫做全局解释器锁的东西来保持线程安全,这可能会减慢这里的一些用例。但是,如果您主要与其他系统交互(运行HTTP命令、与数据库通信、写入文件系统),那么您的代码可能不受CPU的约束,因此不会受到太大的影响。事实上,我发现在编写HTTP/HTTPS基准测试时,这里使用的线程模型的开销和延迟更少,因为创建新进程的开销比创建新线程的开销要高得多,而且程序只是在等待HTTP响应。

所以如果你在python用户空间中处理大量的东西,这可能不是最好的方法。

正如@penky Suresh在回答中建议的那样,不要使用内置关键字。

显然,在处理多处理时,args是一个内置关键字


class TTS:
def __init__(self):
pass


def process_and_render_items(self):
multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]


with ProcessPoolExecutor(max_workers=10) as executor:
# Using args here is fine.
future_processes = {
executor.submit(TTS.process_and_render_item, args)
for args in multiprocessing_args
}


for future in as_completed(future_processes):
try:
data = future.result()
except Exception as exc:
print(f"Generated an exception: {exc}")
else:
print(f"Generated data for comment process: {future}")
 



# Dont use 'args' here. It seems to be a built-in keyword.
# Changing 'args' to 'arg' worked for me.
def process_and_render_item(arg):
print(arg)
# This will print {"a": "b", "c": "d"} for the first process
# and {"e": "f", "g": "h"} for the second process.






PS:制表符/空格可能有点不对。

一个快速的解决方法是使函数全局化

from multiprocessing import Pool




class Test:
def __init__(self, x):
self.x = x
    

@staticmethod
def test(x):
return x**2




def test_apply(self, list_):
global r
def r(x):
return Test.test(x + self.x)


with Pool() as p:
l = p.map(r, list_)


return l






if __name__ == '__main__':
o = Test(2)
print(o.test_apply(range(10)))