线程池执行器在队列满时阻塞? ?

我正在尝试使用 ThreadPoolExecator 执行许多任务:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
threadPoolExecutor.execute(runnable)

问题是,由于任务的数量超过了工作队列的大小,我很快就得到了 java.util.concurrent.RejectedExecutionException。但是,我所期望的行为是拥有主线程块,直到队列中有空间为止。实现这一目标的最佳方法是什么?

64985 次浏览

在一些非常有限的情况下,您可以实现一个 java.util.concurt.RejectedExectionHandler 来完成您需要的任务。

RejectedExecutionHandler block = new RejectedExecutionHandler() {
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.getQueue().put( r );
}
};


ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

这是 一个非常糟糕的主意,原因如下

  • 它容易发生死锁,因为池中的所有线程都可能在您放入队列的内容可见之前死亡。通过设置一个合理的保持存活的时间来减轻这种情况。
  • 任务没有按照执行者可能期望的方式进行包装。许多执行器实现在执行之前将它们的任务包装在某种跟踪对象中。看看你的源头。
  • API 强烈反对通过 getQueue ()进行添加,并且在某些时候可能会被禁止。

一个几乎总是更好的策略是安装 ThreadPoolExecator。CallerRunsPolicy 将通过在调用 execute ()的线程上运行任务来限制应用程序。

然而,有时候一个封锁策略,包括它所有的内在风险,才是你真正想要的。在这种情况下

  • 您只有一个调用 execute ()的线程
  • 您必须(或希望)有一个非常小的队列长度
  • 您绝对需要限制运行此工作的线程的数量(通常是由于外部原因) ,而调用方运行策略将打破这一限制。
  • 您的任务具有不可预测的大小,因此如果池中暂时忙于4个短任务,而您的一个线程调用执行遇到一个大任务,那么调用方运行可能会导致饥饿。

所以,就像我说的,它很少被需要,而且可能很危险,但是你做到了。

祝你好运。

下面是我在本例中的代码片段:

public void executeBlocking( Runnable command ) {
if ( threadPool == null ) {
logger.error( "Thread pool '{}' not initialized.", threadPoolName );
return;
}
ThreadPool threadPoolMonitor = this;
boolean accepted = false;
do {
try {
threadPool.execute( new Runnable() {
@Override
public void run() {
try {
command.run();
}
// to make sure that the monitor is freed on exit
finally {
// Notify all the threads waiting for the resource, if any.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.notifyAll();
}
}
}
} );
accepted = true;
}
catch ( RejectedExecutionException e ) {
// Thread pool is full
try {
// Block until one of the threads finishes its job and exits.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.wait();
}
}
catch ( InterruptedException ignored ) {
// return immediately
break;
}
}
} while ( !accepted );
}

ThreadPool 是已经初始化的 java.util.concurt.ExecutorService 的本地实例。

我使用一个自定义 RejectedExectionHandler 解决了这个问题,它只是简单地阻塞调用线程一会儿,然后尝试再次提交任务:

public class BlockWhenQueueFull implements RejectedExecutionHandler {


public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {


// The pool is full. Wait, then try again.
try {
long waitMs = 250;
Thread.sleep(waitMs);
} catch (InterruptedException interruptedException) {}


executor.execute(r);
}
}

这个类可以像其他类一样在线程池执行器中作为 RejectedExectionHandler 使用:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

我看到的唯一缺点是,调用线程的锁定时间可能略长于严格必要的时间(高达250ms)。对于许多短时间运行的任务,可以将等待时间减少到10ms 左右。此外,由于这个执行器是递归调用的,因此等待线程变得可用的时间过长(几个小时)可能会导致堆栈溢出。

尽管如此,我个人还是喜欢这个方法。它结构紧凑,易于理解,而且运行良好。我是否遗漏了什么重要的东西?

您可以使用 semaphore来阻止线程进入池。

ExecutorService service = new ThreadPoolExecutor(
3,
3,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(6, false)
);


Semaphore lock = new Semaphore(6); // equal to queue capacity


for (int i = 0; i < 100000; i++ ) {
try {
lock.acquire();
service.submit(() -> {
try {
task.run();
} finally {
lock.release();
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

一些陷阱 :

  • 只在固定线程池中使用此模式。队列不太可能经常满,因此不会创建新线程。有关更多细节,请查看 ThreadPoolExecator 上的 java 文档: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html有一种方法可以绕过这个问题,但它超出了这个答案的范围。
  • 队列大小应该高于核心线程的数量。如果我们将队列大小设置为3,最终会发生的情况是:

    • T0: 所有三个线程都在工作,队列为空,没有可用的许可。
    • T1: 线程1完成,释放许可证。
    • T2: 线程1轮询队列中的新工作,没有发现任何新工作,而且 等待
    • T3: 主线程将工作提交到池中,线程1开始工作。

    上面的示例转换为主线程 挡住了线程1。这可能看起来像一个很小的周期,但现在乘以天和月的频率。突然之间,短暂的时间加起来就会浪费大量的时间。

您需要做的是将 ThreadPoolExecator 封装到 Execator 中,这将显式地限制其中并发执行操作的数量:

 private static class BlockingExecutor implements Executor {


final Semaphore semaphore;
final Executor delegate;


private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
semaphore = new Semaphore(concurrentTasksLimit);
this.delegate = delegate;
}


@Override
public void execute(final Runnable command) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}


final Runnable wrapped = () -> {
try {
command.run();
} finally {
semaphore.release();
}
};


delegate.execute(wrapped);


}
}

您可以将 concurrentTaskslimit 调整为您的委托执行器的 threadPoolSize + queue eSize,它将很好地解决您的问题

这就是我最后所做的:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();


for (int i = 0; i < 100000; i++) {
try {
lock.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pool.execute(() -> {
try {
// Task logic
} finally {
lock.release();
}
});
}

一个相当简单的选择是用一个在调用 offer(..)时调用 put(..)的实现包装您的 BlockingQueue:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {


(..)


public boolean offer(E o) {
try {
delegate.put(o);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return true;
}


(.. implement all other methods simply by delegating ..)


}

这是因为在默认情况下,put(..)会等到队列满员时队列中有容量了,:

    /**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
*         prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
*         element prevents it from being added to this queue
*/
void put(E e) throws InterruptedException;

无需捕捉 RejectedExecutionException或复杂的锁定必要的。

好吧,老线程,但这是我发现什么时,搜索阻塞线程执行器。当任务提交到任务队列时,我的代码尝试获取信号量。如果没有剩余的信号量,则此块。一旦任务完成,装饰器就会释放信号量。可怕的是,有一个丢失信号量的可能性,但这可以解决,例如一个定时作业,只是清除信号量的定时基础。

所以我的解决办法是:

class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
companion object {
lateinit var semaphore: Semaphore
}


init {
semaphore = Semaphore(concurrency)
val semaphoreTaskDecorator = SemaphoreTaskDecorator()
this.setTaskDecorator(semaphoreTaskDecorator)
}


override fun <T> submit(task: Callable<T>): Future<T> {
log.debug("submit")
semaphore.acquire()
return super.submit(task)
}
}


private class SemaphoreTaskDecorator : TaskDecorator {
override fun decorate(runnable: Runnable): Runnable {
log.debug("decorate")
return Runnable {
try {
runnable.run()
} finally {
log.debug("decorate done")
semaphore.release()
}
}
}
}

可以覆盖 ThreadPoolExecutor.execute(command)使用 Semaphore,例如:

/**
* The setup answering the question needs to have:
*
* permits = 3
* corePoolSize = permits (i.e. 3)
* maximumPoolSize = corePoolSize (i.e. 3)
* workQueue = anything different to null
*
* With this setup workQueue won’t actually be used but only
* to check if it’s empty, which should always be the case.
* Any more than permits as value for maximumPoolSize will have
* no effect because at any moment no more than permits calls to
* super.execute() will be allowed by the semaphore.
*/
public class ExecutionBlockingThreadPool extends ThreadPoolExecutor {
private final Semaphore semaphore;


// constructor setting super(…) parameters and initializing semaphore
//
// Below is a bare minimum constructor; using
// corePoolSize = maximumPoolSize = permits
// allows one to use SynchronousQueue because I expect
// none other that isEmpty() to be called on it; it also
// allows for using 0L SECONDS because no more than
// corePoolSize threads should be attempted to create.
public ExecutionBlockingThreadPool(int corePoolSize) {
super(corePoolSize, corePoolSize, 0L, SECONDS, new SynchronousQueue<Runnable>());
semaphore = new Semaphore(corePoolSize, true);
}


public void execute(Runnable command) {
semaphore.acquire();
super.execute(() -> {
try {
command.run();
} finally {
semaphore.release();
}
}
}
}

可以实现 RejectedTaskHandler,并在队列大小满时获取所有被拒绝的任务。默认情况下,执行器具有 Abort 策略,因此您可以将这些任务从处理程序或其他选择添加回队列。

public class ExecutorRejectedTaskHandlerFixedThreadPool {
public static void main(String[] args) throws InterruptedException {


//maximum queue size : 2
BlockingQueue<Runnable> blockingQueue =
new LinkedBlockingQueue<Runnable>(2);




CustomThreadPoolExecutor executor =
new CustomThreadPoolExecutor(4, 5, 5, TimeUnit.SECONDS,
blockingQueue);


RejectedTaskHandler rejectedHandler = new RejectedTaskHandler();
executor.setRejectedExecutionHandler(rejectedHandler);
//submit 20 the tasks for execution
//Note: only 7 tasks(5-max pool size + 2-queue size) will be executed and rest will be rejected as queue will be overflowed
for (int i = 0; i < 20; i++) {
executor.execute(new Task());
}
System.out.println("Thread name " + Thread.currentThread().getName());




}


static class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread - " + Thread.currentThread().getName() + " performing it's job");
}
}




static class RejectedTaskHandler implements RejectedExecutionHandler {


@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task rejected" + r.toString());


}
}




public static class CustomThreadPoolExecutor extends ThreadPoolExecutor {


public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}


@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}


@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);


}
}

}