未检测到在多处理池中引发的异常

当从多处理引发异常时,似乎。池进程,则没有堆栈跟踪或任何其他指示它已失败的迹象。例如:

from multiprocessing import Pool


def go():
print(1)
raise Exception()
print(2)


p = Pool()
p.apply_async(go)
p.close()
p.join()

打印1并静止。有趣的是,引发 BaseException 反而有效。有没有办法使所有异常的行为与 BaseException 相同?

72434 次浏览

我会尝试使用 pdb:

import pdb
import sys
def handler(type, value, tb):
pdb.pm()
sys.excepthook = handler

对于这个问题,我有一个合理的解决方案,至少是出于调试的目的。我目前还没有一个解决方案可以在主进程中引发异常。我的第一个想法是使用一个装饰,但你只能腌制 在模块的顶层定义的函数,所以这是正确的。

取而代之的是一个简单的包装类和一个 Pool 子类,它们对 apply_async(因此也是 apply)使用这个类。我将把 map_async作为一个练习留给读者。

import traceback
from multiprocessing.pool import Pool
import multiprocessing


# Shortcut to multiprocessing's logger
def error(msg, *args):
return multiprocessing.get_logger().error(msg, *args)


class LogExceptions(object):
def __init__(self, callable):
self.__callable = callable


def __call__(self, *args, **kwargs):
try:
result = self.__callable(*args, **kwargs)


except Exception as e:
# Here we add some debugging help. If multiprocessing's
# debugging is on, it will arrange to log the traceback
error(traceback.format_exc())
# Re-raise the original exception so the Pool worker can
# clean up
raise


# It was fine, give a normal answer
return result


class LoggingPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)


def go():
print(1)
raise Exception()
print(2)


multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)


p.apply_async(go)
p.close()
p.join()

这给了我:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
File "mpdebug.py", line 24, in __call__
result = self.__callable(*args, **kwargs)
File "mpdebug.py", line 44, in go
raise Exception()
Exception

也许我遗漏了什么,但是这不正是 Result 对象的 get方法返回的内容吗。

类 multiprocessing.pool. AsyncResult

Pool.Apply _ sync ()和 Pool.map _ sync () . get ([ timeout ])返回的结果类
当它到达时返回结果。如果超时不是无,并且结果没有到达 超时秒数,然后多处理。引发 TimeoutError。如果远程 调用引发一个异常,然后通过 get ()重新引发该异常。

所以,稍微修改一下你的例子

from multiprocessing import Pool


def go():
print(1)
raise Exception("foobar")
print(2)


p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

结果就是

1
Traceback (most recent call last):
File "rob.py", line 10, in <module>
x.get()
File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
raise self._value
Exception: foobar

这并不完全令人满意,因为它不打印回溯,但总比什么都不打印好。

更新: 这个 bug 已经在 Python 3.4中修复,由 Richard Oudkerk 提供。

我创建了一个模块 Remoteexception.py,它显示了进程中异常的完整回溯。蟒蛇2。下载下来并将其添加到代码中:

import RemoteException


@RemoteException.showError
def go():
raise Exception('Error!')


if __name__ == '__main__':
import multiprocessing
p = multiprocessing.Pool(processes = 1)
r = p.apply(go) # full traceback is shown here

我已经成功地记录了这个装饰器的异常:

import traceback, functools, multiprocessing


def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
func(*args, **kwargs)
except:
print 'Exception in '+func.__name__
traceback.print_exc()
return wrapped_func

在问题中的代码,它是

@trace_unhandled_exceptions
def go():
print(1)
raise Exception()
print(2)


p = multiprocessing.Pool(1)


p.apply_async(go)
p.close()
p.join()

只需简单地装饰传递给流程池的函数。这个工作的关键是 @functools.wraps(func),否则多处理抛出一个 PicklingError

上述代码给出

1
Exception in go
Traceback (most recent call last):
File "<stdin>", line 5, in wrapped_func
File "<stdin>", line 4, in go
Exception

在撰写本文时得票最多的解决方案存在一个问题:

from multiprocessing import Pool


def go():
print(1)
raise Exception("foobar")
print(2)


p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

正如@dfrankow 指出的,它将等待 x.get(),这会破坏异步运行任务的要点。因此,为了提高效率(特别是如果您的工作函数 go需要很长时间) ,我将它改为:

from multiprocessing import Pool


def go(x):
print(1)
# task_that_takes_a_long_time()
raise Exception("Can't go anywhere.")
print(2)
return x**2


p = Pool()
results = []
for x in range(1000):
results.append( p.apply_async(go, [x]) )


p.close()


for r in results:
r.get()

优点 : worker 函数是异步运行的,因此,例如,如果您在几个核上运行许多任务,它将比原始解决方案高效得多。

缺点 : 如果 worker 函数中有一个异常,它只会被提升为 之后,池已经完成了所有的任务。这可能是,也可能不是可取的行为。根据@colinfang 的评论编辑,修正了这个问题。

既然您已经使用了 apply_sync,我猜测用例是希望执行一些同步任务。使用回调处理是另一种选择。请注意,此选项仅适用于 python3.2及以上版本,而不适用于 python2.7。

from multiprocessing import Pool


def callback(result):
print('success', result)


def callback_error(result):
print('error', result)


def go():
print(1)
raise Exception()
print(2)


p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)


# You can do another things


p.close()
p.join()
import logging
from multiprocessing import Pool


def proc_wrapper(func, *args, **kwargs):
"""Print exception because multiprocessing lib doesn't return them right."""
try:
return func(*args, **kwargs)
except Exception as e:
logging.exception(e)
raise


def go(x):
print x
raise Exception("foobar")


p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()

由于 multiprocessing.Pool已经有不错的答案可用,我将提供一个解决方案使用不同的方法来完成。

对于 python >= 3.2来说,下面的解决方案似乎是最简单的:

from concurrent.futures import ProcessPoolExecutor, wait


def go():
print(1)
raise Exception()
print(2)




futures = []
with ProcessPoolExecutor() as p:
for i in range(10):
futures.append(p.submit(go))


results = [f.result() for f in futures]

优点:

  • 很少的代码
  • 在主进程中引发异常
  • 提供堆栈跟踪
  • 没有外部依赖性

关于 API 的更多信息,请查看 这个

此外,如果您提交了大量任务,并希望主流程在其中一个任务失败后立即失败,可以使用以下代码片段:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time




def go():
print(1)
time.sleep(0.3)
raise Exception()
print(2)




futures = []
with ProcessPoolExecutor(1) as p:
for i in range(10):
futures.append(p.submit(go))


for f in as_completed(futures):
if f.exception() is not None:
for f in futures:
f.cancel()
break


[f.result() for f in futures]

只有在所有任务都执行完毕之后,所有其他答案才会失败。