处理来自Java ExecutorService任务的异常

我试图使用Java的ThreadPoolExecutor类运行大量具有固定数量线程的重量级任务。每个任务都有许多可能由于异常而失败的地方。

我已经继承了ThreadPoolExecutor的子类,并重写了afterExecute方法,该方法应该提供在运行任务时遇到的任何未捕获的异常。然而,我似乎不能让它工作。

例如:

public class ThreadPoolErrors extends ThreadPoolExecutor {
public ThreadPoolErrors() {
super(  1, // core threads
1, // max threads
1, // timeout
TimeUnit.MINUTES, // timeout units
new LinkedBlockingQueue<Runnable>() // work queue
);
}


protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if(t != null) {
System.out.println("Got an error: " + t);
} else {
System.out.println("Everything's fine--situation normal!");
}
}


public static void main( String [] args) {
ThreadPoolErrors threadPool = new ThreadPoolErrors();
threadPool.submit(
new Runnable() {
public void run() {
throw new RuntimeException("Ouch! Got an error.");
}
}
);
threadPool.shutdown();
}
}

这个程序的输出是“一切正常——情况正常!”,尽管提交给线程池的唯一Runnable抛出了异常。你知道这里发生了什么吗?

谢谢!

215735 次浏览

WARNING:需要注意的是,此解决方案将阻塞future.get()中的调用线程。


如果你想处理任务抛出的异常,那么通常最好使用Callable而不是Runnable

Callable.call()被允许抛出受控异常,这些异常会传播回调用线程:

Callable task = ...
Future future = executor.submit(task);
// do something else in the meantime, and then...
try {
future.get();
} catch (ExecutionException ex) {
ex.getCause().printStackTrace();
}

如果Callable.call()抛出异常,则该异常将被包装在ExecutionException中并由Future.get()抛出。

这可能比继承ThreadPoolExecutor更可取。如果异常是可恢复的,它还为您提供了重新提交任务的机会。

而不是子类化ThreadPoolExecutor,我将为它提供一个ThreadFactory实例来创建新的线程,并为它们提供一个UncaughtExceptionHandler

文档:

注意:当动作被包含在 task(例如FutureTask) 显式地或通过方法 提交时,这些任务对象会捕获和 维护计算性异常和 所以它们不会引起突然性 终止,和内部 异常不会传递给this 方法。< / p >

当你提交一个Runnable时,它会被包装在一个Future中。

你的afterExecute应该是这样的:

public final class ExtendedExecutor extends ThreadPoolExecutor {


// ...


protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Future<?> future = (Future<?>) r;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
if (t != null) {
System.out.println(t);
}
}
}

这种行为的解释在javadoc for afterExecute中:

注意:当动作被包含在 task(例如FutureTask) 显式地或通过方法 提交时,这些任务对象会捕获和 维护计算性异常和 所以它们不会引起突然性 终止,和内部 异常不会传递给this 方法。< / p >

我使用了来自jcabi-logVerboseRunnable类,它吞噬所有异常并记录它们。非常方便,例如:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
new VerboseRunnable(
Runnable() {
public void run() {
// the code, which may throw
}
},
true // it means that all exceptions will be swallowed and logged
),
1, 1, TimeUnit.MILLISECONDS
);

我通过将提供的可运行文件打包提交给执行程序来解决这个问题。

CompletableFuture.runAsync(() -> {
try {
runnable.run();
} catch (Throwable e) {
Log.info(Concurrency.class, "runAsync", e);
}
}, executorService);

如果你的ExecutorService来自外部源(即不可能子类化ThreadPoolExecutor并覆盖afterExecute()),你可以使用动态代理来实现所需的行为:

public static ExecutorService errorAware(final ExecutorService executor) {
return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[] {ExecutorService.class},
(proxy, method, args) -> {
if (method.getName().equals("submit")) {
final Object arg0 = args[0];
if (arg0 instanceof Runnable) {
args[0] = new Runnable() {
@Override
public void run() {
final Runnable task = (Runnable) arg0;
try {
task.run();
if (task instanceof Future<?>) {
final Future<?> future = (Future<?>) task;


if (future.isDone()) {
try {
future.get();
} catch (final CancellationException ce) {
// Your error-handling code here
ce.printStackTrace();
} catch (final ExecutionException ee) {
// Your error-handling code here
ee.getCause().printStackTrace();
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
} catch (final RuntimeException re) {
// Your error-handling code here
re.printStackTrace();
throw re;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
} else if (arg0 instanceof Callable<?>) {
args[0] = new Callable<Object>() {
@Override
public Object call() throws Exception {
final Callable<?> task = (Callable<?>) arg0;
try {
return task.call();
} catch (final Exception e) {
// Your error-handling code here
e.printStackTrace();
throw e;
} catch (final Error e) {
// Your error-handling code here
e.printStackTrace();
throw e;
}
}
};
}
}
return method.invoke(executor, args);
});
}

另一个解决方案是使用ManagedTaskManagedTaskListener

你需要一个可调用的可运行的来实现接口ManagedTask

方法getManagedTaskListener返回你想要的实例。

public ManagedTaskListener getManagedTaskListener() {

你在ManagedTaskListener中实现了taskDone方法:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
if (exception != null) {
LOGGER.log(Level.SEVERE, exception.getMessage());
}
}

关于托管任务生命周期和侦听器的更多细节。

这是因为AbstractExecutorService :: submit将你的runnable包装成RunnableFuture(只有FutureTask),如下所示

AbstractExecutorService.java


public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
execute(ftask);
return ftask;
}

然后execute将其传递给Worker,而Worker.run()将调用下面的。

ThreadPoolExecutor.java


final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted.  This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();           /////////HERE////////
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

最后task.run();在上面的代码调用将调用 FutureTask.run()。下面是异常处理程序代码,因为

class FutureTask<V> implements RunnableFuture<V>


public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {   /////////HERE////////
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

如果希望监视任务的执行,可以旋转1或2个线程(可能更多,这取决于负载),并使用它们从ExecutionCompletionService包装器中获取任务。

这是

  • 它来源于SingleThreadExecutor,但是你可以很容易地适应它
  • Java 8的lamdas代码,但是很容易修复

它会创建一个Executor单线程,可以得到很多任务;并将等待当前的一个结束执行,然后开始执行下一个

如果出现uncaugth错误或异常,uncaughtExceptionHandler将捕获它

public final class SingleThreadExecutorWithExceptions {


public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {


ThreadFactory factory = (Runnable runnable)  -> {
final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions");
newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> {
uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
});
return newThread;
};
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(),
factory){




protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
if (throwable == null && runnable instanceof Future) {
try {
Future future = (Future) runnable;
if (future.isDone()) {
future.get();
}
} catch (CancellationException ce) {
throwable = ce;
} catch (ExecutionException ee) {
throwable = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (throwable != null) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable);
}
}
});
}






private static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}


/**
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
*/
private static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future submit(Runnable task) {
return e.submit(task);
}
public  Future submit(Callable task) {
return e.submit(task);
}
public  Future submit(Runnable task, T result) {
return e.submit(task, result);
}
public  List> invokeAll(Collection> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public  List> invokeAll(Collection> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public  T invokeAny(Collection> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public  T invokeAny(Collection> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}






private SingleThreadExecutorWithExceptions() {}
}

这与mmm的解决方案类似,但更容易理解。让你的任务扩展一个封装run()方法的抽象类。

public abstract Task implements Runnable {


public abstract void execute();


public void run() {
try {
execute();
} catch (Throwable t) {
// handle it
}
}
}




public MySampleTask extends Task {
public void execute() {
// heavy, error-prone code here
}
}

医生的例子并没有给我想要的结果。

当一个线程进程被放弃(显式的interput();s)异常出现。

我还想保留“系统退出”;普通主线程具有的典型throw功能,我想这样做,以便程序员不必被迫在代码上工作而不得不担心它的上下文(…一个线程),如果出现任何错误,它必须是一个编程错误,或者这种情况必须解决的地方,手动捕获…没有必要过于复杂。

所以我修改了代码来满足我的需要。

    @Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
Future<?> future = (Future<?>) r;
boolean terminate = false;
try {
future.get();
} catch (ExecutionException e) {
terminate = true;
e.printStackTrace();
} catch (InterruptedException | CancellationException ie) {// ignore/reset
Thread.currentThread().interrupt();
} finally {
if (terminate) System.exit(0);
}
}
}

但是要小心,这段代码基本上将你的线程转换为主线程异常,同时保持所有的并行属性…但让我们现实一点,在系统并行机制(extends Thread)的功能中设计架构是错误的方法,恕我直言……除非严格要求使用事件驱动设计....但后来…如果这是需求,那么问题是:在这种情况下是否需要ExecutorService ?……也许不是。