不可能使缓存线程池的大小限制?

似乎不可能创建一个缓存的线程池,并限制它可以创建的线程数量。

下面是静态 Executors.newCachedThreadPool在标准 Java 库中的实现方式:

 public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

因此,使用该模板继续创建一个固定大小的缓存线程池:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronusQueue<Runable>());

现在,如果你使用这个并提交3个任务,一切都会好起来。提交任何进一步的任务将导致拒绝执行异常。

试试这个:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runable>());

将导致所有线程顺序执行。也就是说,线程池永远不会产生多个线程来处理您的任务。

这是 ThreadPoolExecutor执行方法中的一个错误吗?或者这是故意的?还有别的办法吗?

编辑: 我想要一个类似于缓存线程池的东西(它根据需要创建线程,然后在一段时间的超时后杀死它们) ,但是对它可以创建的线程数量有限制,并且在线程达到限制后仍然可以继续对其他任务进行排队。根据 Sjlee 的回答,这是不可能的。看看 ThreadPoolExecutorexecute()方法确实是不可能的。我需要像 SwingWorker那样子类 ThreadPoolExecutor并覆盖 execute(),但是 SwingWorker在它的 execute()中所做的是一个完整的黑客行为。

59755 次浏览

这就是你想要的(至少我是这么想的)

Executors.newFixedThreadPool(int n)

创建一个线程池,该线程池重用在共享无界队列上操作的固定数量的线程。在任何时候,nThreads 线程最多都是活动处理任务。如果在所有线程都处于活动状态时提交了其他任务,则这些任务将在队列中等待,直到有一个线程可用。如果任何线程在关闭之前由于执行失败而终止,那么如果需要执行后续任务,将会有一个新的线程取代它。在显式关闭之前,池中的线程将一直存在。

根据 Javadoc for ThreadPoolExecator:

如果运行的线程大于 corePoolSize 但小于 maxumPoolSize,则将创建一个新线程 只有在队列满员的情况下。通过将 corePoolSize 和 maxumPoolSize 设置为相同,可以创建一个固定大小的线程池。

(重点是我的)

Jitter 的回答是你想要的,尽管我的回答是你的另一个问题:)

在第一个示例中,后续任务被拒绝,因为 AbortPolicy是默认的 RejectedExecutionHandler。ThreadPoolExecator 包含以下策略,您可以通过 setRejectedExecutionHandler方法更改这些策略:

CallerRunsPolicy
AbortPolicy
DiscardPolicy
DiscardOldestPolicy

这听起来像是您想要具有 CallerRunsPolicy 的缓存线程池。

ThreadPoolExecutor有以下几个关键行为,您的问题可以用这些行为来解释。

当提交任务时,

  1. 如果线程池没有达到核心大小,它将创建新线程。
  2. 如果已经达到核心大小,并且没有空闲线程,那么它将对任务进行排队。
  3. 如果已达到核心大小,则不存在空闲线程,并且队列已满,则会创建新线程(直到达到最大大小)。
  4. 如果已达到最大大小,则不存在空闲线程,并且队列已满,则执行拒绝策略。

在第一个示例中,请注意 SynchronousQueue的大小基本上为0。因此,一旦你达到最大尺寸(3) ,拒绝策略就会生效(# 4)。

在第二个示例中,选择的队列是 LinkedBlockingQueue,其大小不受限制。因此,你陷入了行为 # 2。

您不能对缓存类型或固定类型进行太多的修改,因为它们的行为几乎是完全确定的。

如果您想要有一个有界和动态的线程池,您需要使用一个正的内核大小和最大大小,并结合使用一个有限大小的队列。比如说,

new ThreadPoolExecutor(10, // core size
50, // max size
10*60, // idle timeout
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(20)); // queue with a size

附录 : 这是一个相当古老的答案,而且当涉及到核心大小为0时,JDK 似乎改变了它的行为。自 JDK 1.6以来,如果核心大小为0且池中没有任何线程,ThreadPoolExecator 将添加一个线程来执行该任务。因此,核心大小为0是上述规则的一个例外。感谢 史蒂夫带着给我的注意。

这里的所有答案都没有解决我的问题,我的问题与使用 Apache 的 HTTP 客户端(3.x 版本)创建有限数量的 HTTP 连接有关。由于我花了几个小时才想出一个好的设置,我将分享:

private ExecutorService executor = new ThreadPoolExecutor(5, 10, 60L,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());

这将创建一个 ThreadPoolExecutor,该 ThreadPoolExecutor从5开始,最多保存10个使用 CallerRunsPolicy执行的同时运行的线程。

还有一个选择。与使用新的 SynchronousQueue 不同,您还可以使用任何其他队列,但是您必须确保其大小为1,这样将强制 Executorservice 创建新线程。

看起来似乎没有任何答案能够回答这个问题——事实上,我看不出有什么方法可以做到这一点——即使你从 PooledExecutorService 继承子类,因为很多方法/属性都是私有的,例如,让 addIfUnderMaximumPoolSize 受到保护,你可以做到以下几点:

class MyThreadPoolService extends ThreadPoolService {
public void execute(Runnable run) {
if (poolSize() == 0) {
if (addIfUnderMaximumPoolSize(run) != null)
return;
}
super.execute(run);
}
}

我得到的最接近的答案是这样的——但即使这样也不是一个很好的解决方案

new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()) {
public void execute(Runnable command) {
if (getPoolSize() == 0 && getActiveCount() < getMaximumPoolSize()) {
super.setCorePoolSize(super.getCorePoolSize() + 1);
}
super.execute(command);
}


protected void afterExecute(Runnable r, Throwable t) {
// nothing in the queue
if (getQueue().isEmpty() && getPoolSize() > min) {
setCorePoolSize(getCorePoolSize() - 1);
}
};
};

附注: 以上未经测试

还有一个办法。我认为这个解决方案的行为符合您的期望(尽管并不为这个解决方案感到自豪) :

final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
public boolean offer(Runnable o) {
if (size() > 1)
return false;
return super.offer(o);
};


public boolean add(Runnable o) {
if (super.offer(o))
return true;
else
throw new IllegalStateException("Queue full");
}
};


RejectedExecutionHandler handler = new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
queue.add(r);
}
};


dbThreadExecutor =
new ThreadPoolExecutor(min, max, 60L, TimeUnit.SECONDS, queue, handler);

除非我遗漏了什么,否则原始问题的解决方案很简单。下面的代码实现了原始海报所描述的所需行为。它将产生多达5个线程,在无限制队列上工作,空闲线程将在60秒后终止。

tp = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
tp.allowCoreThreadTimeOut(true);

有同样的问题 因为没有其他答案能把所有问题放在一起,所以我要加上我的:

现在,它清楚地写在 医生: 如果您使用一个队列,不阻塞(LinkedBlockingQueue)最大线程设置没有任何效果,只有核心线程使用。

所以:

public class MyExecutor extends ThreadPoolExecutor {


public MyExecutor() {
super(4, 4, 5,TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
allowCoreThreadTimeOut(true);
}


public void setThreads(int n){
setMaximumPoolSize(Math.max(1, n));
setCorePoolSize(Math.max(1, n));
}


}

这位遗嘱执行人:

  1. 没有最大线程的概念,因为我们使用的是无界队列。这是一件好事,因为这样的队列可能导致执行器创建大量的非核心,额外的线程,如果它遵循其通常的策略。

  2. 最大大小 Integer.MAX_VALUE的队列。如果挂起的任务数超过 Integer.MAX_VALUESubmit()将抛出 RejectedExecutionException。不确定我们是否会先用完内存,否则就会发生这种情况。

  3. 可能有4个核心线程。如果空闲5秒,空闲的核心线程会自动退出。使用 setThreads()方法可以改变数量。

  4. 确保核心线程的最小数量不少于一个,否则 submit()将拒绝每个任务。因为核心线程需要 > = max 线程,所以方法 setThreads()也设置 max 线程,尽管 max 线程设置对于无限制队列是无用的。

  1. 您可以按照 < strong >@sjlee 的建议使用 ThreadPoolExecutor

    您可以动态地控制池的大小:

    动态线程池

    或者

  2. 你可以使用 偷窃工作池 API,它是由 java 8引入的。

    public static ExecutorService newWorkStealingPool()
    

    使用所有可用处理器作为目标并行级别,创建一个偷取工作的线程池。

默认情况下,并行级别设置为服务器中 CPU 核的数量。如果您有4个核心 CPU 服务器,那么线程池的大小应该是4。此 API 返回 ForkJoinPool类型的 ExecutorService,并允许通过从 ForkJoinPool 中繁忙线程窃取任务来窃取空闲线程的工作。

问题概述如下:

我想要一些类似于缓存线程池的东西(它根据需要创建线程,然后在一段时间超时后杀死它们) ,但是对它可以创建的线程数量有限制,并且一旦达到线程限制,它就可以继续对其他任务进行排队。

在指出解决方案之前,我将解释为什么下列解决方案不起作用:

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

当达到3的限制时,这将不会对任何任务排队,因为根据定义,SynchronousQueue 不能保存任何元素。

new ThreadPoolExecutor(0, 3, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>());

这将不会创建多于一个线程,因为如果队列已满,ThreadPoolExecator 只会创建超过 corePoolSize 的线程。但 LinkedBlockingQueue 永远不会满。

ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
executor.allowCoreThreadTimeOut(true);

在达到 corePoolSize 之前,这将不会重用线程,因为即使现有线程处于空闲状态,ThreadPoolExecator 也会增加线程的数量,直到达到 corePoolSize 为止。如果你能忍受这个缺点,那么这就是解决这个问题的最简单的方法。它也是“ Java 实践中的并发”(p172的脚注)中描述的解决方案。

所描述问题的唯一完整解决方案似乎是覆盖队列的 offer方法并编写一个 RejectedExecutionHandler,正如这个问题的答案所解释的那样: 如何让 ThreadPoolExecector 在排队之前将线程增加到最大值?

这适用于 Java8 + (以及其他目前... ...)

     Executor executor = new ThreadPoolExecutor(3, 3, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<>())\{\{allowCoreThreadTimeOut(true);}};

其中3是线程数量的限制,5是空闲线程的超时。

如果你想要 看看你自己能不能用,这里是完成这项工作的代码:

public static void main(String[] args) throws InterruptedException {
final int DESIRED_NUMBER_OF_THREADS=3; // limit of number of Threads for the task at a time
final int DESIRED_THREAD_IDLE_DEATH_TIMEOUT=5; //any idle Thread ends if it remains idle for X seconds


System.out.println( java.lang.Thread.activeCount() + " threads");
Executor executor = new ThreadPoolExecutor(DESIRED_NUMBER_OF_THREADS, DESIRED_NUMBER_OF_THREADS, DESIRED_THREAD_IDLE_DEATH_TIMEOUT, TimeUnit.SECONDS,
new LinkedBlockingQueue<>()) \{\{allowCoreThreadTimeOut(true);}};


System.out.println(java.lang.Thread.activeCount() + " threads");


for (int i = 0; i < 5; i++) {
final int fi = i;
executor.execute(() -> waitsout("starting hard thread computation " + fi, "hard thread computation done " + fi,2000));
}
System.out.println("If this is UP, it works");


while (true) {
System.out.println(
java.lang.Thread.activeCount() + " threads");
Thread.sleep(700);
}


}


static void waitsout(String pre, String post, int timeout) {
try {
System.out.println(pre);
Thread.sleep(timeout);
System.out.println(post);
} catch (Exception e) {
}
}

上面代码的输出对我来说是

1 threads
1 threads
If this is UP, it works
starting hard thread computation 0
4 threads
starting hard thread computation 2
starting hard thread computation 1
4 threads
4 threads
hard thread computation done 2
hard thread computation done 0
hard thread computation done 1
starting hard thread computation 3
starting hard thread computation 4
4 threads
4 threads
4 threads
hard thread computation done 3
hard thread computation done 4
4 threads
4 threads
4 threads
4 threads
3 threads
3 threads
3 threads
1 threads
1 threads

我建议使用 信号方法

来自 信号执行者课程:

只有当提供的队列从 offer ()返回 false 时,ThreadPoolExecator 才会创建一个新线程。这意味着,如果给它一个无界的队列,无论队列有多长,它都只会创建1个线程。但是,如果绑定队列并提交的可运行程序数量超过线程数量,则任务将被拒绝并引发异常。因此,我们创建一个队列,如果它不是空的,那么它总是返回 false,以确保创建新线程。然后,如果任务被拒绝,我们只需将其添加到队列中。

    public static ExecutorService newCachedBoundedExecutor(final String name, int minThreads, int maxThreads) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(minThreads,
maxThreads,
30,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable runnable) {
if (size() > 1 && size() <= maxThreads) {
//create new thread
return false;
} else {
return super.offer(runnable);
}
}
}, new NumberedThreadFactory(name));


threadPool.setRejectedExecutionHandler((runnable, executor) -> {
try {
executor.getQueue().put(runnable);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});


return threadPool;
}