线程池执行器中的核心池大小与最大池大小之间的关系

当我们谈论 ThreadPoolExecutor时,核心池尺寸核心池尺寸最大游泳池面积到底有什么不同?
能用例子来解释吗?

140335 次浏览

来自 医生:

当一个新任务在方法 execute 中提交时(java.lang.Runnable) , 并且运行的线程少于 corePoolSize,则新线程为 创建来处理请求,即使其他工作线程处于空闲状态。 如果有多于 corePoolSize 但小于 maxumPoolSize 线程运行时,只有当队列为 满了。

此外:

通过将 corePoolSize 和 maxumPoolSize 设置为相同,可以创建 一个固定大小的线程池 无界值,如 Integer.MAX _ VALUE,则允许池 可以容纳任意数量的并发任务, 核心和最大池大小仅在构造时设置,但它们 也可以使用 setCorePoolSize (int)和 SetMaximumPoolSize (int).

来自 这篇博文:

以此为例,初始线程池大小为1,核心池大小为 5,最大池大小为10,队列为100。

随着请求的增加, 将创建最多5个线程,然后将任务添加到 队列,直到它达到100。当队列是完整的新线程将 创建到 maxPoolSize。一旦所有的线程都在使用和 队列是完整的任务将被拒绝。随着队列减少,这样做 活动线程的数量。

您可以在 javadoc.http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ThreadPoolExecutor.html中找到术语 corepoolsize 和 maxpoolsize 的定义

上面的链接有你问题的答案。但是,先说清楚。应用程序将继续创建线程,直到达到 corePoolSize。 我认为这里的想法是,这些线程应该足以处理任务的流入。如果在创建 corePoolSize 线程之后出现新任务,则任务将排队。一旦队列满了,执行程序将开始创建新线程。这是一种平衡。它本质上意味着任务的流入超过了处理能力。因此,Execator 将再次开始创建新线程,直到达到最大线程数。同样,当且仅当队列已满时,将创建新线程。

如果您决定手动创建一个 ThreadPoolExecutor而不是使用 Executors工厂类,那么您将需要使用它的一个构造函数来创建和配置一个 ThreadPoolExecutor。这个类的最广泛的构造函数是:

public ThreadPoolExecutor(
int corePoolSize,
int maxPoolSize,
long keepAlive,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler
);

如您所见,您可以配置:

  • 核心池的大小(线程池将努力保持的大小)。
  • 最大池大小。
  • 保持活动的时间,这是一个时间之后,一个空闲的线程有资格被拆除。
  • 保存等待执行的任务的工作队列。
  • 拒绝任务提交时应用的策略。

限制排队任务的数量

限制正在执行的并发任务的数量,调整线程池的大小,在可预测性和稳定性方面对应用程序及其执行环境有巨大的好处: 无限制的线程创建最终将耗尽运行时资源,应用程序可能会因此遇到严重的性能问题,甚至可能导致应用程序不稳定。

这只是问题的一部分的解决方案: 您限制了正在执行的任务的数量,但是没有限制可以提交和排队等待以后执行的作业的数量。应用程序将在以后经历资源短缺,但是如果提交率始终超过执行率,它最终将经历资源短缺。

解决这个问题的办法是: 为执行器提供一个阻塞队列来保存等待的任务。在队列填满的情况下,提交的任务将被“拒绝”。 在拒绝任务提交时调用 RejectedExecutionHandler,这就是为什么在前一项中引用了拒绝动词。您可以实现自己的拒绝策略,或者使用框架提供的内置策略之一。

默认拒绝策略让执行者抛出一个 RejectedExecutionException。然而,其他内置策略允许您:

  • 默默地丢掉工作。
  • 丢弃最旧的作业并尝试重新提交最后一个作业。
  • 在调用方的线程上执行被拒绝的任务。

如果 运行线程 > corePoolSize & < maxPoolSize,然后创建一个新的线程,如果总任务队列已满,新的一个到达。

来自文档: (如果运行的线程多于 CorePoolSize但少于 MaxumPoolSize 最大池大小,则只有在队列已满时才会创建新线程。)

举个简单的例子,

ThreadPoolExecutor executorPool = new ThreadPoolExecutor(5, 10, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));

这里,5是 corePoolSize -意味着 Jvm 将为前5个任务的新任务创建新线程。其他任务将被添加到队列中,直到队列满员(50个任务)。

10是 maxPoolSize -JVM 可以创建最多10个线程。 意味着如果已经有5个任务/线程正在运行,并且队列已经满了50个挂起的任务,并且如果又有一个新的请求/任务到达队列,那么 JVM 将创建最多10个新线程(总线程 = 前5 + 新5) ;

New ArrayBlockingQueue (50) = 是一个总队列大小-它可以在其中排队50个任务。

一旦所有10个线程运行,如果新任务到达,那么新任务将被拒绝。

SUN 在内部创建线程的规则:

  1. 如果线程数小于 corePoolSize,则创建一个新的 Thread 来运行新任务。

  2. 如果线程数等于(或大于) corePoolSize,则将任务放入队列。

  3. 如果队列已满,且线程数小于 maxPoolSize,则创建一个新线程以在其中运行任务。

  4. 如果队列已满,且线程数大于或等于 maxPoolSize,则拒绝该任务。

Hope 这里是 HelpFul... 如果我说错了请纠正我..。

这个的博客中有一个很好的解释:

插图

public class ThreadPoolExecutorExample {


public static void main (String[] args) {
createAndRunPoolForQueue(new ArrayBlockingQueue<Runnable>(3), "Bounded");
createAndRunPoolForQueue(new LinkedBlockingDeque<>(), "Unbounded");
createAndRunPoolForQueue(new SynchronousQueue<Runnable>(), "Direct hand-off");
}


private static void createAndRunPoolForQueue (BlockingQueue<Runnable> queue,
String msg) {
System.out.println("---- " + msg + " queue instance = " +
queue.getClass()+ " -------------");


ThreadPoolExecutor e = new ThreadPoolExecutor(2, 5, Long.MAX_VALUE,
TimeUnit.NANOSECONDS, queue);


for (int i = 0; i < 10; i++) {
try {
e.execute(new Task());
} catch (RejectedExecutionException ex) {
System.out.println("Task rejected = " + (i + 1));
}
printStatus(i + 1, e);
}


e.shutdownNow();


System.out.println("--------------------\n");
}


private static void printStatus (int taskSubmitted, ThreadPoolExecutor e) {
StringBuilder s = new StringBuilder();
s.append("poolSize = ")
.append(e.getPoolSize())
.append(", corePoolSize = ")
.append(e.getCorePoolSize())
.append(", queueSize = ")
.append(e.getQueue()
.size())
.append(", queueRemainingCapacity = ")
.append(e.getQueue()
.remainingCapacity())
.append(", maximumPoolSize = ")
.append(e.getMaximumPoolSize())
.append(", totalTasksSubmitted = ")
.append(taskSubmitted);


System.out.println(s.toString());
}


private static class Task implements Runnable {


@Override
public void run () {
while (true) {
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
break;
}
}
}
}
}

产出:

---- Bounded queue instance = class java.util.concurrent.ArrayBlockingQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 3, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueCapacity = 1, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 3, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 4, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------


---- Unbounded queue instance = class java.util.concurrent.LinkedBlockingDeque -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 2147483647, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 2, corePoolSize = 2, queueSize = 1, queueRemainingCapacity = 2147483646, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 2, corePoolSize = 2, queueSize = 2, queueRemainingCapacity = 2147483645, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 2, corePoolSize = 2, queueSize = 3, queueRemainingCapacity = 2147483644, maximumPoolSize = 5, totalTasksSubmitted = 5
poolSize = 2, corePoolSize = 2, queueSize = 4, queueRemainingCapacity = 2147483643, maximumPoolSize = 5, totalTasksSubmitted = 6
poolSize = 2, corePoolSize = 2, queueSize = 5, queueRemainingCapacity = 2147483642, maximumPoolSize = 5, totalTasksSubmitted = 7
poolSize = 2, corePoolSize = 2, queueSize = 6, queueRemainingCapacity = 2147483641, maximumPoolSize = 5, totalTasksSubmitted = 8
poolSize = 2, corePoolSize = 2, queueSize = 7, queueRemainingCapacity = 2147483640, maximumPoolSize = 5, totalTasksSubmitted = 9
poolSize = 2, corePoolSize = 2, queueSize = 8, queueRemainingCapacity = 2147483639, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------


---- Direct hand-off queue instance = class java.util.concurrent.SynchronousQueue -------------
poolSize = 1, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 1
poolSize = 2, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 2
poolSize = 3, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 3
poolSize = 4, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 4
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 5
Task rejected = 6
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 6
Task rejected = 7
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 7
Task rejected = 8
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 8
Task rejected = 9
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 9
Task rejected = 10
poolSize = 5, corePoolSize = 2, queueSize = 0, queueRemainingCapacity = 0, maximumPoolSize = 5, totalTasksSubmitted = 10
--------------------




Process finished with exit code 0

并发,线程池执行器

  public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task.  The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread.  If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

摘自 Java 并发性要素:

CorePoolSize : ThreadPoolExecector 有一个属性 corePoolSize,用于确定它将启动多少个线程 直到队列满时才启动新线程

MaximumPoolSize : 此属性决定最大启动多少个线程。 为了没有上限,使用 MAX _ VALUE

在提交新任务时,了解 ThreadPoolExecutor的内部行为有助于我理解 corePoolSizemaximumPoolSize的区别。

让我们:

  • N是池中的线程数,getPoolSize()。活动线程 + 空闲线程。
  • T是提交给执行者/池的任务量。
  • 核心池大小为 C,即 getCorePoolSize()。对于传入任务 在新任务进入队列之前,每个池最多可以创建多少个线程。
  • M是池的最大大小 getMaximumPoolSize()。池可以分配的最大线程数量。

提交新任务时,Java 中 ThreadPoolExecutor的行为:

  • 对于 N <= C,不会为空闲线程分配新的传入任务,而是创建一个新线程。
  • 对于 N > C,如果有空闲线程,则在那里分配新任务。
  • 对于 N > C,如果没有空闲线程,则将新任务放入队列中
  • 当队列满了 时,我们创建到 M的新线程。如果到达 M,则拒绝任务。在这里不重要的是,我们不创建新线程,直到队列满了!

资料来源:

例子

例如,corePoolSize = 0maximumPoolSize = 10的队列容量为 50

这将导致池中只有一个活动线程,直到队列中有50个项为止。

executor.execute(task #1):


before task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]


after task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]


[task #1 immediately queued and kicked in b/c the very first thread is created when `workerCountOf(recheck) == 0`]


execute(task #2):


before task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]


after task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@c52dafe[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]


[task #2 not starting before #1 is done]


... executed a few tasks...


execute(task #19)


before task #19 submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 17, completed tasks = 0]


after task #19 submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 18, completed tasks = 0]


...


execute(task #51)


before task submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 1, active threads = 1, queued tasks = 50, completed tasks = 0]


after task submitted to executor: java.util.concurrent.ThreadPoolExecutor@735afe38[Running, pool size = 2, active threads = 2, queued tasks = 50, completed tasks = 0]


Queue is full.
A new thread was created as the queue was full.


例如,corePoolSize = 10maximumPoolSize = 10的队列容量为 50

这将在池中产生10个活动线程。当队列中有50个项时,任务将被拒绝。

execute(task #1)


before task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]


after task #1 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]


execute(task #2)


before task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 0]


after task #2 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]


execute(task #3)


before task #3 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]


after task #3 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]


... executed a few tasks...


execute(task #11)


before task #11 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 0, completed tasks = 0]


after task #11 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 1, completed tasks = 0]


... executed a few tasks...


execute(task #51)
before task #51 submitted to executor: java.util.concurrent.ThreadPoolExecutor@32d9e072[Running, pool size = 10, active threads = 10, queued tasks = 50, completed tasks = 0]


Task was rejected as we have reached `maximumPoolSize`.


来源

线程池执行器池大小的规则

关于 ThreadPoolExecutor's池大小的规则通常是不被理解的,因为它没有按照您认为应该的方式或您希望它按照的方式工作。

举个例子。起始线程池大小为1,核心池大小为5,最大池大小为10,队列为100。

Sun 的方法: 当线程中的请求最多创建5个,然后将任务添加到队列中,直到队列达到100个。当队列满员时,将创建到 maxPoolSize的新线程。一旦所有的线程都在使用,并且队列已满,将拒绝执行任务。随着队列的减少,活动线程的数量也随之减少。

用户预期的方式: 当线程中的请求最多创建10个时,任务将被添加到队列中,直到达到100个时才被拒绝。线程的数量将以最大值重命名,直到队列为空。当队列为空时,线程将逐渐消失,直到剩下 corePoolSize

不同之处在于,用户希望更早地开始增加池大小,并希望队列更小,而 Sun 方法希望保持池大小较小,并且只在负载变得过大时才增加池大小。

以下是 Sun 关于线程创建的简单规则:

  1. 如果线程数小于 corePoolSize,则创建一个新线程来运行新任务。
  2. 如果线程数等于(或大于) corePoolSize,则将任务放入队列中。
  3. 如果队列已满,且线程数小于 maxPoolSize,则创建一个新线程以在其中运行任务。
  4. 如果队列已满,且线程数大于或等于 maxPoolSize,则拒绝该任务。 简而言之,只有当队列填满时才会创建新线程,所以如果使用无限制队列,那么线程的数量不会超过 corePoolSize

要获得更全面的解释,请参考以下文档: ThreadPoolExecutor API 文档。

有一个非常好的论坛帖子,它告诉你通过 ThreadPoolExecutor与代码示例的工作方式: http://forums.sun.com/thread.jspa?threadID=5401400&tstart=0

更多信息: http://forums.sun.com/thread.jspa?threadID=5224557&tstart=450

在图片中,考虑只有额外的任务正在发生

enter image description here

根据文件:

任何 BlockingQueue 都可用于传输和保存提交的任务。 此队列的使用与池大小交互: 如果少于 核心池大小的线程正在运行,Execator 总是喜欢添加一个 如果核心池大小或更多线程是 运行时,执行者总是倾向于将请求排队,而不是 添加新线程。如果请求不能排队,则新线程为 创建,除非这将超过 maxumPoolSize,在这种情况下, 任务将被拒绝。

这意味着核心池大小是一个阈值,超过这个阈值,执行服务会选择将任务排队,而不是产生一个新线程。如果在接收任务时,线程的数量小于核心池的大小,则创建一个新线程,从而增加活动线程。