是否有使用当前线程的 ExecutorService?

我所追求的是一种兼容的方式来配置线程池的使用与否。理想情况下,代码的其余部分不应该受到任何影响。我可以使用一个只有一个线程的线程池,但这并不是我想要的。有什么想法吗?

ExecutorService es = threads == 0 ? new CurrentThreadExecutor() : Executors.newThreadPoolExecutor(threads);


// es.execute / es.submit / new ExecutorCompletionService(es) etc
48021 次浏览

You can use the RejectedExecutionHandler to run the task in the current thread.

public static final ThreadPoolExecutor CURRENT_THREAD_EXECUTOR = new ThreadPoolExecutor(0, 0, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(), new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
r.run();
}
});

You only need one of these ever.

Here's a really simple Executor (not ExecutorService, mind you) implementation that only uses the current thread. Stealing this from "Java Concurrency in Practice" (essential reading).

public class CurrentThreadExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}

ExecutorService is a more elaborate interface, but could be handled with the same approach.

I wrote an ExecutorService based on the AbstractExecutorService.

/**
* Executes all submitted tasks directly in the same thread as the caller.
*/
public class SameThreadExecutorService extends AbstractExecutorService {


//volatile because can be viewed by other threads
private volatile boolean terminated;


@Override
public void shutdown() {
terminated = true;
}


@Override
public boolean isShutdown() {
return terminated;
}


@Override
public boolean isTerminated() {
return terminated;
}


@Override
public boolean awaitTermination(long theTimeout, TimeUnit theUnit) throws InterruptedException {
shutdown(); // TODO ok to call shutdown? what if the client never called shutdown???
return terminated;
}


@Override
public List<Runnable> shutdownNow() {
return Collections.emptyList();
}


@Override
public void execute(Runnable theCommand) {
theCommand.run();
}
}

You can use Guava's MoreExecutors.newDirectExecutorService(), or MoreExecutors.directExecutor() if you don't need an ExecutorService.

If including Guava is too heavy-weight, you can implement something almost as good:

public final class SameThreadExecutorService extends ThreadPoolExecutor {
private final CountDownLatch signal = new CountDownLatch(1);


private SameThreadExecutorService() {
super(1, 1, 0, TimeUnit.DAYS, new SynchronousQueue<Runnable>(),
new ThreadPoolExecutor.CallerRunsPolicy());
}


@Override public void shutdown() {
super.shutdown();
signal.countDown();
}


public static ExecutorService getInstance() {
return SingletonHolder.instance;
}


private static class SingletonHolder {
static ExecutorService instance = createInstance();
}


private static ExecutorService createInstance() {
final SameThreadExecutorService instance
= new SameThreadExecutorService();


// The executor has one worker thread. Give it a Runnable that waits
// until the executor service is shut down.
// All other submitted tasks will use the RejectedExecutionHandler
// which runs tasks using the  caller's thread.
instance.submit(new Runnable() {
@Override public void run() {
boolean interrupted = false;
try {
while (true) {
try {
instance.signal.await();
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}});
return Executors.unconfigurableScheduledExecutorService(instance);
}
}

Java 8 style:

Executor e = Runnable::run;

I had to use the same "CurrentThreadExecutorService" for testing purposes and, although all suggested solutions were nice (particularly the one mentioning the Guava way), I came up with something similar to what Peter Lawrey suggested here.

As mentioned by Axelle Ziegler here, unfortunately Peter's solution won't actually work because of the check introduced in ThreadPoolExecutor on the maximumPoolSize constructor parameter (i.e. maximumPoolSize can't be <=0).

In order to circumvent that, I did the following:

private static ExecutorService currentThreadExecutorService() {
CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
return new ThreadPoolExecutor(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), callerRunsPolicy) {
@Override
public void execute(Runnable command) {
callerRunsPolicy.rejectedExecution(command, this);
}
};
}