如何在线程之间传播异常?

我们有一个单线程调用的函数(我们将其命名为主线程)。在函数体中,我们生成多个工作线程来执行 CPU 密集型工作,等待所有线程完成,然后在主线程上返回结果。

其结果是,调用方可以天真地使用函数,并在内部使用多个核。

目前一切正常。

我们的问题是处理异常。我们不希望工作线程上的异常导致应用程序崩溃。我们希望函数的调用方能够在主线程上捕获它们。我们必须捕获辅助线程上的异常,并将它们传播到主线程,以使它们继续从那里解除。

我们要怎么做?

我能想到的最好的办法就是:

  1. 在我们的工作线程上捕获各种各样的异常(std: : Exception 和一些我们自己的异常)。
  2. Record the type and message of the exception.
  3. 在主线程上有一个相应的 switch 语句,该语句重新引发工作线程上记录的任何类型的异常。

这有一个明显的缺点,那就是只支持有限的一组异常类型,而且每当添加新的异常类型时都需要进行修改。

53708 次浏览

实际上,没有好的通用方法可以将异常从一个线程传输到下一个线程。

如果您的所有异常都源自 std: : Exception,那么您可以拥有一个顶级的一般异常捕获,它会以某种方式将异常发送到主线程,然后再次抛出异常。问题是您丢失了异常的抛出点。您可以编写依赖于编译器的代码来获取这些信息并传输它们。

如果不是所有的异常都继承了 std: : Exception,那么您就有麻烦了,必须在线程中编写大量顶级 catch... 但是解决方案仍然有效。

您能否序列化辅助线程中的异常,将其传输回主线程,反序列化,然后再次抛出?我希望为了这个工作,所有异常都必须从同一个类派生(或者至少再次使用 switch 语句的一小组类)。而且,我也不确定它们是否可以连载,我只是在自言自语。

您需要为 worker 中的所有异常(包括非标准异常,比如访问违规)执行一个通用捕获,并从 worker 线程发送一条消息(我想您已经准备好了某种消息?)包含指向该异常的活动指针的控制线程,并通过创建该异常的副本在该线程中重新引发。 然后工作者可以释放原始对象并退出。

您的问题是,您可能从多个线程接收到多个异常,因为每个异常都可能失败,可能是由于不同的原因。

I am assuming the main thread is somehow waiting for the threads to end to retrieve the results, or checking regularly the other threads' progress, and that access to shared data is synchronized.

简单的解决办法

简单的解决方案是捕获每个线程中的所有异常,将它们记录在一个共享变量中(在主线程中)。

所有线程完成后,决定如何处理异常。这意味着所有其他线程继续它们的处理,这可能不是您想要的。

Complex solution

更复杂的解决方案是,如果异常从另一个线程抛出,则让每个线程在其执行的战略点进行检查。

If a thread throws an exception, it is caught before exiting the thread, the exception object is copied into some container in the main thread (as in the simple solution), and some shared boolean variable is set to true.

当另一个线程测试这个布尔值时,它看到执行将被中止,并以一种优雅的方式中止。

当所有线程都中止时,主线程可以根据需要处理异常。

从线程引发的异常不能在父线程中捕获。线程有不同的上下文和堆栈,通常不需要父线程停留在那里等待子线程完成,这样它就可以捕获它们的异常。这种捕捉在代码中根本没有位置:

try
{
start thread();
wait_finish( thread );
}
catch(...)
{
// will catch exceptions generated within start and wait,
// but not from the thread itself
}

您将需要捕获每个线程内部的异常,并解释主线程中线程的退出状态,以重新抛出可能需要的任何异常。

顺便说一句,在线程中没有 catch 的情况下,如果堆栈展开完成,那么它就是特定于实现的,也就是说,在调用 end 之前,可能甚至不会调用自动变量的析构函数。有些编译器会这样做,但不是必需的。

目前,唯一的 便携式的方法是为您可能希望在线程之间传输的所有类型的异常编写 catch 子句,将该 catch 子句中的信息存储在某个地方,然后在以后使用它重新抛出异常。这是 Boost.Exception采用的方法。

在 C + + 0x 中,可以使用 catch(...)捕获异常,然后使用 std::current_exception()将其存储在 std::exception_ptr的实例中。然后,您可以使用 std::rethrow_exception()从相同或不同的线程重新抛出它。

如果您使用的是 MicrosoftVisualStudio2005或更高版本,则 : thread C + + 0x 线程库支持 std::exception_ptr

See http://www.boost.org/doc/libs/release/libs/exception/doc/tutorial_exception_ptr.html. It is also possible to write a wrapper function of whatever function you call to join a child thread, which automatically re-throws (using boost::rethrow_exception) any exception emitted by a child thread.

如果你使用的是 C + + 11,那么 std::future可能完全符合你的要求: 它可以自动捕获异常,使其到工作线程的顶部,并在 std::future::get被调用的时候将它们传递给父线程。(在幕后,这与@Anthony Williams 的回答完全一样; 它已经为您实现了。)

缺点是没有标准的方法来“停止关心”std::future; 甚至它的析构函数也只是在任务完成之前阻塞。这里有一个玩具的例子可以解释我的意思:

#include <atomic>
#include <chrono>
#include <exception>
#include <future>
#include <thread>
#include <vector>
#include <stdio.h>


bool is_prime(int n)
{
if (n == 1010) {
puts("is_prime(1010) throws an exception");
throw std::logic_error("1010");
}
/* We actually want this loop to run slowly, for demonstration purposes. */
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (int i=2; i < n; ++i) { if (n % i == 0) return false; }
return (n >= 2);
}


int worker()
{
static std::atomic<int> hundreds(0);
const int start = 100 * hundreds++;
const int end = start + 100;
int sum = 0;
for (int i=start; i < end; ++i) {
if (is_prime(i)) { printf("%d is prime\n", i); sum += i; }
}
return sum;
}


int spawn_workers(int N)
{
std::vector<std::future<int>> waitables;
for (int i=0; i < N; ++i) {
std::future<int> f = std::async(std::launch::async, worker);
waitables.emplace_back(std::move(f));
}


int sum = 0;
for (std::future<int> &f : waitables) {
sum += f.get();  /* may throw an exception */
}
return sum;
/* But watch out! When f.get() throws an exception, we still need
* to unwind the stack, which means destructing "waitables" and each
* of its elements. The destructor of each std::future will block
* as if calling this->wait(). So in fact this may not do what you
* really want. */
}


int main()
{
try {
int sum = spawn_workers(100);
printf("sum is %d\n", sum);
} catch (std::exception &e) {
/* This line will be printed after all the prime-number output. */
printf("Caught %s\n", e.what());
}
}

我只是尝试使用 std::threadstd::exception_ptr编写一个类似工作的示例,但是 std::exception_ptr出了问题(使用 libc + +) ,所以我还没有让它真正工作。:(

编辑,2017:

int main() {
std::exception_ptr e;
std::thread t1([&e](){
try {
::operator new(-1);
} catch (...) {
e = std::current_exception();
}
});
t1.join();
try {
std::rethrow_exception(e);
} catch (const std::bad_alloc&) {
puts("Success!");
}
}

我不知道自己在2013年做错了什么,但我肯定那是我的错。]

C + + 11引入了 exception_ptr类型,该类型允许在线程之间传输异常:

#include<iostream>
#include<thread>
#include<exception>
#include<stdexcept>


static std::exception_ptr teptr = nullptr;


void f()
{
try
{
std::this_thread::sleep_for(std::chrono::seconds(1));
throw std::runtime_error("To be passed between threads");
}
catch(...)
{
teptr = std::current_exception();
}
}


int main(int argc, char **argv)
{
std::thread mythread(f);
mythread.join();


if (teptr) {
try{
std::rethrow_exception(teptr);
}
catch(const std::exception &ex)
{
std::cerr << "Thread exited with exception: " << ex.what() << "\n";
}
}


return 0;
}

因为在您的情况下,您有多个工作线程,您需要为每个工作线程保留一个 exception_ptr

Note that exception_ptr is a shared ptr-like pointer, so you will need to keep at least one exception_ptr pointing to each exception or they will be released.

Microsoft 特定: 如果您使用 SEH 异常(/EHa) ,示例代码还将传输 SEH 异常,如访问违规,这可能不是您想要的。