捕获线程's异常在调用线程?

我对Python和多线程编程非常陌生。基本上,我有一个脚本,将文件复制到另一个位置。我想把它放在另一个线程中,这样我就可以输出....来表明脚本仍在运行。

我遇到的问题是,如果文件不能复制,它将抛出异常。如果在主线程中运行,这是可以的;但是,使用以下代码是无效的:

try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start()   ##### **Exception takes place here**
except:
print "Caught an exception"

在线程类本身中,我试图重新抛出异常,但它不起作用。我在这里看到有人问类似的问题,但他们似乎都在做一些比我试图做的更具体的事情(我不太理解所提供的解决方案)。我看到有人提到sys.exc_info()的用法,但我不知道在哪里或如何使用它。

线程类的代码如下:

class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
    

def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
266003 次浏览

使用裸例外并不是一个好的实践,因为您通常会获得比您讨价还价时更多的东西。

我建议修改except只捕获你想处理的异常。我不认为引发它有预期的效果,因为当你在外部try中实例化TheThread时,如果它引发异常,则赋值永远不会发生。

相反,你可能只想提醒它,然后继续前进,比如:

def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except OSError, err:
print err

然后,当异常被捕获时,您可以在那里处理它。然后,当外部tryTheThread捕获异常时,你知道它不是你已经处理过的异常,并将帮助你隔离流程流。

问题是thread_obj.start()立即返回。您所生成的子线程在它自己的上下文中使用自己的堆栈执行。在那里发生的任何异常都在子线程的上下文中,并且在它自己的堆栈中。我现在能想到的一种将此信息传递给父线程的方法是使用某种消息传递,因此您可以研究一下。

试试这个尺寸:

import sys
import threading
import queue




class ExcThread(threading.Thread):


def __init__(self, bucket):
threading.Thread.__init__(self)
self.bucket = bucket


def run(self):
try:
raise Exception('An error occured here.')
except Exception:
self.bucket.put(sys.exc_info())




def main():
bucket = queue.Queue()
thread_obj = ExcThread(bucket)
thread_obj.start()


while True:
try:
exc = bucket.get(block=False)
except queue.Empty:
pass
else:
exc_type, exc_obj, exc_trace = exc
# deal with the exception
print exc_type, exc_obj
print exc_trace


thread_obj.join(0.1)
if thread_obj.isAlive():
continue
else:
break




if __name__ == '__main__':
main()

虽然不可能直接捕获在不同线程中抛出的异常,但下面的代码可以相当透明地获取与此功能非常接近的内容。你的子线程必须子类化ExThread类而不是threading.Thread,父线程在等待线程完成它的工作时必须调用child_thread.join_with_exception()方法而不是child_thread.join()

此实现的技术细节:当子线程抛出异常时,它将通过Queue传递给父线程,并在父线程中再次抛出。注意,在这种方法中没有忙碌等待。

#!/usr/bin/env python


import sys
import threading
import Queue


class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()


def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError


def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except BaseException:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)


def wait_for_exc_info(self):
return self.__status_queue.get()


def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]


class MyException(Exception):
pass


class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)


def run_with_exception(self):
thread_name = threading.current_thread().name
raise MyException("An error in thread '{}'.".format(thread_name))


def main():
t = MyThread()
t.start()
try:
t.join_with_exception()
except MyException as ex:
thread_name = threading.current_thread().name
print "Caught a MyException in thread '{}': {}".format(thread_name, ex)


if __name__ == '__main__':
main()

如果在线程中发生异常,最好的方法是在join期间在调用线程中重新引发它。您可以使用sys.exc_info()函数获取当前正在处理的异常的信息。此信息可以简单地存储为线程对象的属性,直到调用join,此时可以重新引发它。

注意,在线程最多抛出1个异常在抛出异常后立即完成的简单情况下,Queue.Queue(在其他回答中建议)是不必要的。我们通过简单地等待线程完成来避免竞争条件。

例如,扩展ExcThread(如下),覆盖excRun(而不是run)。

Python 2. x:

import threading


class ExcThread(threading.Thread):
def excRun(self):
pass


def run(self):
self.exc = None
try:
# Possibly throws an exception
self.excRun()
except:
import sys
self.exc = sys.exc_info()
# Save details of the exception thrown but don't rethrow,
# just complete the function


def join(self):
threading.Thread.join(self)
if self.exc:
msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
new_exc = Exception(msg)
raise new_exc.__class__, new_exc, self.exc[2]

Python 3. x:

raise的参数形式3在Python 3中已经消失,因此将最后一行更改为:

raise new_exc.with_traceback(self.exc[2])

concurrent.futures模块使得在单独的线程(或进程)中工作并处理任何由此产生的异常变得简单:

import concurrent.futures
import shutil


def copytree_with_dots(src_path, dst_path):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# Execute the copy on a separate thread,
# creating a future object to track progress.
future = executor.submit(shutil.copytree, src_path, dst_path)


while future.running():
# Print pretty dots here.
pass


# Return the value returned by shutil.copytree(), None.
# Raise any exceptions raised during the copy process.
return future.result()

concurrent.futures包含在Python 3.2中,在早期版本中可作为backport futures模块使用。

作为线程的新手,我花了很长时间来理解如何实现Mateusz Kobos的代码(如上)。这里有一个明确的版本,以帮助您了解如何使用它。

#!/usr/bin/env python


import sys
import threading
import Queue


class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()


def run_with_exception(self):
"""This method should be overriden."""
raise NotImplementedError


def run(self):
"""This method should NOT be overriden."""
try:
self.run_with_exception()
except Exception:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)


def wait_for_exc_info(self):
return self.__status_queue.get()


def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]


class MyException(Exception):
pass


class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)


# This overrides the "run_with_exception" from class "ExThread"
# Note, this is where the actual thread to be run lives. The thread
# to be run could also call a method or be passed in as an object
def run_with_exception(self):
# Code will function until the int
print "sleeping 5 seconds"
import time
for i in 1, 2, 3, 4, 5:
print i
time.sleep(1)
# Thread should break here
int("str")
# I'm honestly not sure why these appear here? So, I removed them.
# Perhaps Mateusz can clarify?
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))


if __name__ == '__main__':
# The code lives in MyThread in this example. So creating the MyThread
# object set the code to be run (but does not start it yet)
t = MyThread()
# This actually starts the thread
t.start()
print
print ("Notice 't.start()' is considered to have completed, although"
" the countdown continues in its new thread. So you code "
"can tinue into new processing.")
# Now that the thread is running, the join allows for monitoring of it
try:
t.join_with_exception()
# should be able to be replace "Exception" with specific error (untested)
except Exception, e:
print
print "Exceptioon was caught and control passed back to the main thread"
print "Do some handling here...or raise a custom exception "
thread_name = threading.current_thread().name
e = ("Caught a MyException in thread: '" +
str(thread_name) +
"' [" + str(e) + "]")
raise Exception(e) # Or custom class of exception, such as MyException

我喜欢的一种方法是基于观察者模式。我定义了一个信号类,线程用它向侦听器发出异常。它还可以用于从线程返回值。例子:

import threading


class Signal:
def __init__(self):
self._subscribers = list()


def emit(self, *args, **kwargs):
for func in self._subscribers:
func(*args, **kwargs)


def connect(self, func):
self._subscribers.append(func)


def disconnect(self, func):
try:
self._subscribers.remove(func)
except ValueError:
raise ValueError('Function {0} not removed from {1}'.format(func, self))




class WorkerThread(threading.Thread):


def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.Exception = Signal()
self.Result = Signal()


def run(self):
if self._Thread__target is not None:
try:
self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
except Exception as e:
self.Exception.emit(e)
else:
self.Result.emit(self._return_value)


if __name__ == '__main__':
import time


def handle_exception(exc):
print exc.message


def handle_result(res):
print res


def a():
time.sleep(1)
raise IOError('a failed')


def b():
time.sleep(2)
return 'b returns'


t = WorkerThread(target=a)
t2 = WorkerThread(target=b)
t.Exception.connect(handle_exception)
t2.Result.connect(handle_result)
t.start()
t2.start()


print 'Threads started'


t.join()
t2.join()
print 'Done'

我没有足够的使用线程的经验来断言这是一种完全安全的方法。但这对我来说很管用,我喜欢这种灵活性。

类似于RickardSjogren的方法,没有Queue, sys等,但也没有一些信号监听器:直接执行一个异常处理程序,对应于一个异常块。

#!/usr/bin/env python3


import threading


class ExceptionThread(threading.Thread):


def __init__(self, callback=None, *args, **kwargs):
"""
Redirect exceptions of thread to an exception handler.


:param callback: function to handle occured exception
:type callback: function(thread, exception)
:param args: arguments for threading.Thread()
:type args: tuple
:param kwargs: keyword arguments for threading.Thread()
:type kwargs: dict
"""
self._callback = callback
super().__init__(*args, **kwargs)


def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except BaseException as e:
if self._callback is None:
raise e
else:
self._callback(self, e)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self._callback

只有自我。_callback和run()中的except块是普通threading.Thread之外的。

这个问题有很多非常奇怪复杂的答案。我是不是把它简化了,因为对我来说,这似乎对大多数事情都足够了。

from threading import Thread


class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
if hasattr(self, '_Thread__target'):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e


def join(self, timeout=None):
super(PropagatingThread, self).join(timeout)
if self.exc:
raise self.exc
return self.ret

如果你确定你只会在一个或另一个版本的Python上运行,你可以将run()方法减少到仅仅是损坏的版本(如果你只在3之前的Python版本上运行),或者仅仅是干净的版本(如果你只在3开始的Python版本上运行)。

使用示例:

def f(*args, **kwargs):
print(args)
print(kwargs)
raise Exception('I suck at this')


t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

当您加入时,您将看到在另一个线程上引发异常。

如果你正在使用six或仅在Python 3上使用,你可以改进重新引发异常时获得的堆栈跟踪信息。您可以将内部异常包装在一个新的外部异常中,而不是仅在连接点处使用堆栈,并使用

six.raise_from(RuntimeError('Exception in thread'),self.exc)

raise RuntimeError('Exception in thread') from self.exc

这是一个棘手的小问题,我想提出我的解决方案。我发现了一些其他的解决方案(异步。例如IO)看起来很有前途,但也呈现出一些黑盒子。队列/事件循环方法将您与某个实现联系在一起。然而,并发期货的源代码只有大约1000行,很容易理解。它让我很容易地解决了我的问题:创建临时的工作线程,而不需要太多的设置,并且能够在主线程中捕获异常。

我的解决方案使用并发期货API和线程API。它允许你创建一个worker,给你线程和未来。这样,你就可以加入线程来等待结果:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

...或者你可以让worker在完成时发送一个回调:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

...或者你可以循环直到事件完成:

worker = Worker(test)
thread = worker.start()


while True:
print("waiting")
if worker.future.done():
exc = worker.future.exception()
print('exception?', exc)
result = worker.future.result()
print('result', result)
break
time.sleep(0.25)

代码如下:

from concurrent.futures import Future
import threading
import time


class Worker(object):
def __init__(self, fn, args=()):
self.future = Future()
self._fn = fn
self._args = args


def start(self, cb=None):
self._cb = cb
self.future.set_running_or_notify_cancel()
thread = threading.Thread(target=self.run, args=())
thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
thread.start()
return thread


def run(self):
try:
self.future.set_result(self._fn(*self._args))
except BaseException as e:
self.future.set_exception(e)


if(self._cb):
self._cb(self.future.result())

...和测试函数:

def test(*args):
print('args are', args)
time.sleep(2)
raise Exception('foo')

捕捉线程异常并与调用方方法通信的一个简单方法是将字典或列表传递给worker方法。

示例(将字典传递给工作方法):

import threading


def my_method(throw_me):
raise Exception(throw_me)


def worker(shared_obj, *args, **kwargs):
try:
shared_obj['target'](*args, **kwargs)
except Exception as err:
shared_obj['err'] = err


shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"


th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()


if shared_obj['err']:
print(">>%s" % shared_obj['err'])

我知道我在这里有点晚了,但我有一个非常类似的问题,但它包括使用tkinter作为GUI,并且主循环使它不可能使用依赖于.join()的任何解决方案。因此,我调整了原问题EDIT中给出的解决方案,但使其更一般,以便于其他人更容易理解。

下面是运行中的新线程类:

import threading
import traceback
import logging




class ExceptionThread(threading.Thread):
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)


def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except Exception:
logging.error(traceback.format_exc())




def test_function_1(input):
raise IndexError(input)




if __name__ == "__main__":
input = 'useful'


t1 = ExceptionThread(target=test_function_1, args=[input])
t1.start()

当然,您总是可以让它以日志以外的其他方式处理异常,例如将其打印出来,或将其输出到控制台。

这允许您像使用Thread类一样使用ExceptionThread类,无需任何特殊修改。

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

解决方案如下:

  • 当调用异常时,立即返回主线程
  • 不需要额外的用户定义类,因为它不需要:
    • 显式的Queue
    • 在工作线程周围添加except else

来源:

#!/usr/bin/env python3


import concurrent.futures
import time


def func_that_raises(do_raise):
for i in range(3):
print(i)
time.sleep(0.1)
if do_raise:
raise Exception()
for i in range(3):
print(i)
time.sleep(0.1)
    

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
futures.append(executor.submit(func_that_raises, False))
futures.append(executor.submit(func_that_raises, True))
for future in concurrent.futures.as_completed(futures):
print(repr(future.exception()))

可能的输出:

0
0
1
1
2
2
0
Exception()
1
2
None

不幸的是,当一个期货失效时,不可能通过终止期货来取消其他期货:

如果你这样做:

for future in concurrent.futures.as_completed(futures):
if future.exception() is not None:
raise future.exception()

然后with捕获它,并在继续之前等待第二个线程完成。以下行为类似:

for future in concurrent.futures.as_completed(futures):
future.result()

因为future.result()会在异常发生时重新引发异常。

如果你想退出整个Python进程,你可能会使用os._exit(0),但这可能意味着你需要重构。

具有完美异常语义的自定义类

我最终为自己编写了一个完美的接口:限制一次运行的最大线程数的正确方法是什么?节“带有错误处理的队列示例”。该类的目标是既方便,又让您完全控制提交和结果/错误处理。

在Python 3.6.7, Ubuntu 18.04上测试。

使用异常存储包装线程。

import threading
import sys
class ExcThread(threading.Thread):


def __init__(self, target, args = None):
self.args = args if args else []
self.target = target
self.exc = None
threading.Thread.__init__(self)


def run(self):
try:
self.target(*self.args)
raise Exception('An error occured here.')
except Exception:
self.exc=sys.exc_info()


def main():
def hello(name):
print(!"Hello, {name}!")
thread_obj = ExcThread(target=hello, args=("Jack"))
thread_obj.start()


thread_obj.join()
exc = thread_obj.exc
if exc:
exc_type, exc_obj, exc_trace = exc
print(exc_type, ':',exc_obj, ":", exc_trace)


main()

pygolang提供了同步。工作组,特别是将异常从派生的工作线程传播到主线程。例如:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""


from __future__ import print_function
from golang import sync, context


def T1(ctx, *argv):
print('T1: run ... %r' % (argv,))
raise RuntimeError('T1: problem')


def T2(ctx):
print('T2: ran ok')


def main():
wg = sync.WorkGroup(context.background())
wg.go(T1, [1,2,3])
wg.go(T2)


try:
wg.wait()
except Exception as e:
print('Tmain: caught exception: %r\n' %e)
# reraising to see full traceback
raise


if __name__ == '__main__':
main()

在运行时给出以下结果:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)


Traceback (most recent call last):
File "./x.py", line 28, in <module>
main()
File "./x.py", line 21, in main
wg.wait()
File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
pyerr_reraise(pyerr)
File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
f(pywg._pyctx, *argv, **kw)
File "./x.py", line 10, in T1
raise RuntimeError('T1: problem')
RuntimeError: T1: problem

问题的原始代码将是:

    wg = sync.WorkGroup(context.background())


def _(ctx):
shul.copytree(sourceFolder, destFolder)
wg.go(_)


# waits for spawned worker to complete and, on error, reraises
# its exception on the main thread.
wg.wait()

在Python 3.8中,我们可以使用< >强threading.excepthook < / >强来钩住所有子线程!例如,

threading.excepthook = thread_exception_handler

推荐人:https://stackoverflow.com/a/60002752/5093308

我做的是,简单的覆盖连接和运行线程的方法:

class RaisingThread(threading.Thread):
def run(self):
self._exc = None
try:
super().run()
except Exception as e:
self._exc = e


def join(self, timeout=None):
super().join(timeout=timeout)
if self._exc:
raise self._exc

用途如下:

def foo():
time.sleep(2)
print('hi, from foo!')
raise Exception('exception from foo')


t = RaisingThread(target=foo)
t.start()
try:
t.join()
except Exception as e:
print(e)

结果:

hi, from foo!
exception from foo!

我认为其他的解决方案有点复杂,如果你唯一想要的是真正看到某个异常,而不是完全无视和盲目。

解决方案是创建一个自定义Thread,它从主线程中获取记录器并记录任何异常。

class ThreadWithLoggedException(threading.Thread):
"""
Similar to Thread but will log exceptions to passed logger.


Args:
logger: Logger instance used to log any exception in child thread


Exception is also reachable via <thread>.exception from the main thread.
"""


def __init__(self, *args, **kwargs):
try:
self.logger = kwargs.pop("logger")
except KeyError:
raise Exception("Missing 'logger' in kwargs")
super().__init__(*args, **kwargs)
self.exception = None


def run(self):
try:
if self._target is not None:
self._target(*self._args, **self._kwargs)
except Exception as exception:
thread = threading.current_thread()
self.exception = exception
self.logger.exception(f"Exception in child thread {thread}: {exception}")
finally:
del self._target, self._args, self._kwargs

例子:

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


def serve():
raise Exception("Earth exploded.")


th = ThreadWithLoggedException(target=serve, logger=logger)
th.start()


主线程输出:

Exception in child thread <ThreadWithLoggedException(Thread-1, started 139922384414464)>: Earth exploded.
Traceback (most recent call last):
File "/core/utils.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/myapp.py", line 105, in serve
raise Exception("Earth exploded.")
Exception: Earth exploded.


我使用这个版本,它是最小的,它工作得很好。

class SafeThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(SafeThread, self).__init__(*args, **kwargs)
self.exception = None


def run(self) -> None:
try:
super(SafeThread, self).run()
except Exception as ex:
self.exception = ex
traceback.print_exc()


def join(self, *args, **kwargs) -> None:
super(SafeThread, self).join(*args, **kwargs)
if self.exception:
raise self.exception

要使用它,只需将threading.Thread替换为SafeThread

t = SafeThread(target = some_function, args = (some, args,))
t.start()
# do something else here if you want as the thread runs in the background
t.join()

我喜欢这门课:

https://gist.github.com/earonesty/b88d60cb256b71443e42c4f1d949163e

import threading
from typing import Any




class PropagatingThread(threading.Thread):
"""A Threading Class that raises errors it caught, and returns the return value of the target on join."""


def __init__(self, *args, **kwargs):
self._target = None
self._args = ()
self._kwargs = {}
super().__init__(*args, **kwargs)
self.exception = None
self.return_value = None
assert self._target


def run(self):
"""Don't override this if you want the behavior of this class, use target instead."""
try:
if self._target:
self.return_value = self._target(*self._args, **self._kwargs)
except Exception as e:
self.exception = e
finally:
# see super().run() for why this is necessary
del self._target, self._args, self._kwargs


def join(self, timeout=None) -> Any:
super().join(timeout)
if self.exception:
raise self.exception
return self.return_value