How to get the ThreadPoolExecutor to increase threads to max before queueing?

一段时间以来,我一直对 ThreadPoolExecutor的默认行为感到沮丧,因为它支持我们许多人使用的 ExecutorService线程池。引用 Javadocs 的一句话:

If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created 只有在队列满员的情况下.

这意味着如果使用以下代码定义一个线程池,它将启动第二个线程,因为 LinkedBlockingQueue是无界的。

ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue*/));

只有当您有一个 有界队列并且该队列是 满了时,才会启动核心数以上的任何线程。我怀疑许多初级 Java 多线程程序员不知道 ThreadPoolExecutor的这种行为。

Now I have specific use case where this is not-optimal. I'm looking for ways, without writing my own TPE class, to work around it.

My requirements are for a web service that is making call-backs to a possibly unreliable 3rd party.

  • 我不想让回调与网络请求同步,所以我想使用一个线程池。
  • 我通常每分钟都会收到一些这样的信息,所以我不希望 newFixedThreadPool(...)中有大量的线程,而这些线程大多数都处于休眠状态。
  • 每隔一段时间,我就会得到一个这样的流量,我想把线程的数量放大到某个最大值(比如说50)。
  • 我需要做一个 最好的尝试做所有的回调,所以我想队列任何额外的50以上。我不想使用 newCachedThreadPool()使我的网络服务器的其他部分不堪重负。

在需要绑定队列并启动完整的 之前更多线程的 ThreadPoolExecutor中,如何解决这个限制?如何让它启动更多线程 之前排队任务?

编辑:

@ Flavio 很好地阐述了如何使用 ThreadPoolExecutor.allowCoreThreadTimeOut(true)实现核心线程的超时和退出。我考虑过这一点,但我仍然想要核心线程特性。如果可能的话,我不希望池中的线程数量低于核心大小。

37657 次浏览

ThreadPoolExecutor中,在启动更多线程之前,队列需要被限制并且满了,我如何才能绕过这个限制呢。

我相信我终于找到了一个有点优雅(也许有点粗糙)的解决方案来解决 ThreadPoolExecutor的这个限制。它涉及到扩展 LinkedBlockingQueue,使其在已经有一些任务排队的情况下为 queue.offer(...)返回 false。如果当前线程跟不上排队的任务,TPE 将添加额外的线程。如果池已经处于最大线程,那么将调用 RejectedExecutionHandler,将 put(...)放入队列。

编写一个队列,其中 offer(...)可以返回 false,而 put()从不阻塞,这当然很奇怪,所以这就是黑客攻击的部分。但是这与 TPE 对队列的使用很好地结合在一起,因此我认为这样做没有任何问题。

密码是这样的:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
//  have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});

有了这种机制,当我向队列提交任务时,ThreadPoolExecutor将:

  1. 最初将线程的数量放大到核心大小(这里是1)。
  2. Offer it to the queue. If the queue is empty it will be queued to be handled by the existing threads.
  3. If the queue has 1 or more elements already, the offer(...) will return false.
  4. 如果返回 false,则扩展池中的线程数量,直到它们达到最大数量(这里是50)。
  5. 如果在最大值,那么它调用 RejectedExecutionHandler
  6. 然后,RejectedExecutionHandler将任务放入队列,由第一个可用线程按 FIFO 顺序处理。

虽然在上面的示例代码中,队列是无界的,但是您也可以将它定义为有界队列。例如,如果你给 LinkedBlockingQueue加上1000的容量,那么它会:

  1. scale the threads up to max
  2. then queue up until it is full with 1000 tasks
  3. 然后阻塞调用方,直到队列有空间可用。

此外,如果需要在 然后,您可以使用 offer(E, long, TimeUnit)方法而不是使用 Long.MAX_VALUE作为超时。

警告:

如果您期望任务被添加到执行器 之后,它已经关闭,那么您可能希望在执行器服务已经关闭时将 RejectedExecutionException从我们的自定义 RejectedExecutionHandler中丢出。感谢@RaduToader 指出这一点。

Edit:

这个答案的另一个调整可能是询问 TPE 是否存在空闲线程,并且只有在存在空闲线程时才对该项进行排队。您必须为此创建一个真正的类,并在其上添加 ourQueue.setThreadPoolExecutor(tpe);方法。

那么你的 offer(...)方法可能看起来像这样:

  1. 检查 tpe.getPoolSize() == tpe.getMaximumPoolSize()在这种情况下是否调用 super.offer(...)
  2. 否则,如果 tpe.getPoolSize() > tpe.getActiveCount()然后调用 super.offer(...),因为似乎有空闲线程。
  3. 否则返回 false以分叉另一个线程。

也许是这样:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}

注意,TPE 上的 get 方法开销很大,因为它们访问 volatile字段,或者(在 getActiveCount()的情况下)锁定 TPE 并遍历线程列表。此外,这里还存在竞态条件,可能导致任务不正确地排队,或者当存在空闲线程时另一个线程被分叉。

将核心大小和最大大小设置为相同的值,并允许使用 allowCoreThreadTimeOut(true)从池中删除核心线程。

我能想到的最好的解决办法是扩展。

ThreadPoolExecutor提供了几种钩子方法: beforeExecuteafterExecute。在您的扩展中,您可以维护使用一个有界队列来提供任务,使用第二个无界队列来处理溢出。当有人调用 submit时,您可以尝试将请求放入有界队列中。如果遇到异常,只需将该任务粘贴到溢出队列中。然后,您可以利用 afterExecute钩子来查看在完成任务之后溢出队列中是否有任何内容。这样,执行程序将首先处理其有界队列中的内容,并在时间允许的情况下自动从这个无界队列中提取内容。

这似乎比您的解决方案更费工夫,但至少它不会给队列带来意想不到的行为。我还想象有一种更好的方法来检查队列和线程的状态,而不是依赖异常,因为异常的抛出速度相当慢。

我们有一个 ThreadPoolExecutor的子类,它接受额外的 creationThreshold并覆盖 execute

public void execute(Runnable command) {
super.execute(command);
final int poolSize = getPoolSize();
if (poolSize < getMaximumPoolSize()) {
if (getQueue().size() > creationThreshold) {
synchronized (this) {
setCorePoolSize(poolSize + 1);
setCorePoolSize(poolSize);
}
}
}
}

maybe that helps too, but yours looks more artsy of course…

注意: 我现在更喜欢并推荐 my other answer

这个版本对我来说更加简单: 每当执行一个新任务时,就增加 corePoolSize (最大值 PoolSize) ,然后每当完成一个任务时,就减小 corePoolSize (最小值为用户指定的“ core pool size”)。

To put it another way, keep track of the number of running or enqueued tasks, and ensure that the corePoolSize is equal to the number of tasks as long as it is between the user specified "core pool size" and the maximumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
private int userSpecifiedCorePoolSize;
private int taskCount;


public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
userSpecifiedCorePoolSize = corePoolSize;
}


@Override
public void execute(Runnable runnable) {
synchronized (this) {
taskCount++;
setCorePoolSizeToTaskCountWithinBounds();
}
super.execute(runnable);
}


@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
synchronized (this) {
taskCount--;
setCorePoolSizeToTaskCountWithinBounds();
}
}


private void setCorePoolSizeToTaskCountWithinBounds() {
int threads = taskCount;
if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
setCorePoolSize(threads);
}
}

在编写时,该类不支持在构造后更改用户指定的 corePoolSize 或 maxumPoolSize,也不支持直接或通过 remove()purge()操作工作队列。

注意: 我现在更喜欢并推荐 我的另一个答案

我有另一个建议,以下改变队列返回假的原始想法。在这种情况下,所有的任务都可以进入队列,但是每当一个任务在 execute()之后排队时,我们都会在后面跟着一个哨兵不可操作任务,队列会拒绝这个任务,从而产生一个新的线程,这个线程会立即执行不可操作任务,然后从队列中产生一些东西。

因为工作线程可能正在轮询 LinkedBlockingQueue以寻找新任务,所以即使有可用线程,任务也有可能进入队列。为了避免在有线程可用时产生新线程,我们需要跟踪有多少线程正在等待队列上的新任务,并且只有在队列上的任务比等待线程多时才产生新线程。

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };


final AtomicInteger waitingThreads = new AtomicInteger(0);


BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
// offer returning false will cause the executor to spawn a new thread
if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
else return super.offer(e);
}


@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.poll(timeout, unit);
} finally {
waitingThreads.decrementAndGet();
}
}


@Override
public Runnable take() throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.take();
} finally {
waitingThreads.decrementAndGet();
}
}
};


ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
@Override
public void execute(Runnable command) {
super.execute(command);
if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
}
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r == SENTINEL_NO_OP) return;
else throw new RejectedExecutionException();
}
});

关于这个问题,我已经有了另外两个答案,但我怀疑这个答案是最好的。

它基于 目前公认的答案的技术,即:

  1. 重写队列的 offer()方法(有时)返回 false,
  2. 这会导致 ThreadPoolExecutor产生新线程或拒绝任务,并且
  3. 在拒绝任务时将 RejectedExecutionHandler设置为 actually队列。

The problem is when offer() should return false. The currently accepted answer returns false when the queue has a couple of tasks on it, but as I've pointed out in my comment there, this causes undesirable effects. Alternately, if you always return false, you'll keep spawning new threads even when you have threads waiting on the queue.

解决方案是使用 Java7LinkedTransferQueue并让 offer()调用 tryTransfer()。当有一个等待的使用者线程时,任务将被传递给该线程。否则,offer()将返回 false,而 ThreadPoolExecutor将产生一个新线程。

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
return tryTransfer(e);
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});

The recommended answer resolves only one (1) of the issue with the JDK thread pool:

  1. JDK 线程池偏向于排队。因此,它们不会生成新线程,而是将任务排队。只有当队列达到其限制时,线程池才会产生一个新线程。

  2. Thread retirement does not happen when load lightens. For example if we have a burst of jobs hitting the pool that causes the pool to go to max, followed by light load of max 2 tasks at a time, the pool will use all threads to service the light load preventing thread retirement. (only 2 threads would be needed…)

对上面的行为不满意,我继续实现了一个池来克服上面的缺陷。

要解决2)使用 Lifo 调度解决了这个问题。这个想法是由 Ben Maurer 在2015年 ACM 应用会议上提出的: Systems@Facebook scale

因此,一种新的实现应运而生:

LifoThreadPoolExecutorSQP

到目前为止,该实现提高了 泽尔的异步执行性能。

The implementation is spin capable to reduce context switch overhead, yielding superior performance for certain use cases.

希望能帮上忙。

PS: JDK Fork Join Pool 实现 ExecutorService 并作为一个“正常”线程池工作,实现是性能良好的,它使用 LIFO 线程调度,但是没有控制内部队列大小,退休超时... ,最重要的任务不能被中断时取消它们

注意: 对于 JDK ThreadPoolExecator ,当队列有界时,只有当 offer 返回 false 时才会创建新线程。您可以使用 CallerRunsPolicy获得一些有用的东西,它创建了一点 Backpreschand 直接调用调用者线程中的 run。

我需要从由池创建的线程中执行任务,并且有一个有余的队列用于调度,而池中的线程数量可能是 grow心理医生,介于 CorePoolSizeMaxumPoolSize 最大池大小之间,所以..。

我最终从 线程池执行器改变做了一个 完全复制粘贴,稍微执行了一下方法,因为 不幸的是 这不能通过扩展来实现(它调用私有方法)。

我不想在新请求到达并且所有线程都很忙的时候立即产生新线程(因为通常我的任务都很短暂)。我已经添加了一个阈值,但您可以根据自己的需要随意更改它(也许对于大多数 IO 来说,移除这个阈值更好)

private final AtomicInteger activeWorkers = new AtomicInteger(0);
private volatile double threshold = 0.7d;


protected void beforeExecute(Thread t, Runnable r) {
activeWorkers.incrementAndGet();
}
protected void afterExecute(Runnable r, Throwable t) {
activeWorkers.decrementAndGet();
}
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();


int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}


if (isRunning(c) && this.workQueue.offer(command)) {
int recheck = this.ctl.get();
if (!isRunning(recheck) && this.remove(command)) {
this.reject(command);
} else if (workerCountOf(recheck) == 0) {
this.addWorker((Runnable) null, false);
}
//>>change start
else if (workerCountOf(recheck) < maximumPoolSize //
&& (activeWorkers.get() > workerCountOf(recheck) * threshold
|| workQueue.size() > workerCountOf(recheck) * threshold)) {
this.addWorker((Runnable) null, false);
}
//<<change end
} else if (!this.addWorker(command, false)) {
this.reject(command);
}
}

下面是一个使用两个 Threadpool 的解决方案,两个 Threadpool 的核心和最大池大小相同。第二个池在第一个池忙时使用。

import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


public class MyExecutor {
ThreadPoolExecutor tex1, tex2;
public MyExecutor() {
tex1 = new ThreadPoolExecutor(15, 15, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
tex1.allowCoreThreadTimeOut(true);
tex2 = new ThreadPoolExecutor(45, 45, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
tex2.allowCoreThreadTimeOut(true);
}


public Future<?> submit(Runnable task) {
ThreadPoolExecutor ex = tex1;
int excessTasks1 = tex1.getQueue().size() + tex1.getActiveCount() - tex1.getCorePoolSize();
if (excessTasks1 >= 0) {
int excessTasks2 = tex2.getQueue().size() + tex2.getActiveCount() - tex2.getCorePoolSize();;
if (excessTasks2 <= 0 || excessTasks2 / (double) tex2.getCorePoolSize() < excessTasks1 / (double) tex1.getCorePoolSize()) {
ex = tex2;
}
}
return ex.submit(task);
}
}