如何使用ExecutorService等待所有线程完成?

我需要一次执行一定数量的任务4,就像这样:

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
//...wait for completion somehow

当所有这些都完成后,我如何得到通知?现在我想不出比设置一些全局任务计数器更好的方法,并在每个任务结束时减少它,然后在无限循环中监视这个计数器变成0;或获取一个期货列表,并在无限循环监视器isDone为所有它们。不涉及无限循环的更好的解决方案是什么?

谢谢。

406576 次浏览

基本上在ExecutorService上调用shutdown(),然后调用awaitTermination():

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
taskExecutor.shutdown();
try {
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
...
}

使用CountDownLatch:

CountDownLatch latch = new CountDownLatch(totalNumberOfTasks);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}


try {
latch.await();
} catch (InterruptedException E) {
// handle
}

在任务中(在try / finally中附上)

latch.countDown();

你可以把你的任务包装在另一个可运行文件中,它会发送通知:

taskExecutor.execute(new Runnable() {
public void run() {
taskStartedNotification();
new MyTask().run();
taskFinishedNotification();
}
});

您可以使用自己的ExecutorCompletionService子类来包装taskExecutor,并使用自己的BlockingQueue实现来在每个任务完成时获得通知,并在完成的任务数量达到预期目标时执行任何回调或其他操作。

ExecutorService.invokeAll()为你做这件事。

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List<Callable<?>> tasks; // your tasks
// invokeAll() returns when all tasks are complete
List<Future<?>> futures = taskExecutor.invokeAll(tasks);

Java 5及以后版本中的CyclicBarrier类就是为这类事情而设计的。

这只是我的个人意见。 为了克服CountDownLatch预先知道任务数的要求,你可以使用简单的Semaphore.

ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
int numberOfTasks=0;
Semaphore s=new Semaphore(0);
while(...) {
taskExecutor.execute(new MyTask());
numberOfTasks++;
}


try {
s.aquire(numberOfTasks);
...

在你的任务中调用s.release()就像调用latch.countDown();一样

你也可以使用期货列表:

List<Future> futures = new ArrayList<Future>();
// now add to it:
futures.add(executorInstance.submit(new Callable<Void>() {
public Void call() throws IOException {
// do something
return null;
}
}));

然后当你想要连接所有线程时,它本质上等同于连接每个线程,(额外的好处是它会从子线程重新引发异常到主线程):

for(Future f: this.futures) { f.get(); }

基本上,诀窍是每次在每个Future上调用.get(),而不是在(all或each)上无限循环调用isDone()。因此,只要最后一个线程结束,你就可以保证“继续前进”并通过这个块。需要注意的是,由于.get()调用会重新引发异常,如果其中一个线程死亡,则可能会在其他线程完成之前引发异常[为了避免这种情况,可以在get调用周围添加catch ExecutionException]。另一个警告是,它保留了对所有线程的引用,所以如果它们有线程局部变量,它们不会被收集,直到你通过这个块(尽管你可能能够绕过这个,如果它成为一个问题,通过从数组列表中删除Future)。如果你想知道哪个Future“先完成”,你可以使用一些类似https://stackoverflow.com/a/31885029/32453的东西

我刚刚写了一个示例程序来解决你的问题。这里没有给出简洁的实现,所以我将添加一个。虽然你可以使用executor.shutdown()executor.awaitTermination(),但这不是最佳实践,因为不同线程所花费的时间是不可预测的。

ExecutorService es = Executors.newCachedThreadPool();
List<Callable<Integer>> tasks = new ArrayList<>();


for (int j = 1; j <= 10; j++) {
tasks.add(new Callable<Integer>() {


@Override
public Integer call() throws Exception {
int sum = 0;
System.out.println("Starting Thread "
+ Thread.currentThread().getId());


for (int i = 0; i < 1000000; i++) {
sum += i;
}


System.out.println("Stopping Thread "
+ Thread.currentThread().getId());
return sum;
}


});
}


try {
List<Future<Integer>> futures = es.invokeAll(tasks);
int flag = 0;


for (Future<Integer> f : futures) {
Integer res = f.get();
System.out.println("Sum: " + res);
if (!f.isDone())
flag = 1;
}


if (flag == 0)
System.out.println("SUCCESS");
else
System.out.println("FAILED");


} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

在executor getActiveCount() -中有一个方法可以给出活动线程的计数。

在跨越线程之后,我们可以检查activeCount()值是否为0。一旦该值为零,就意味着当前没有活动线程在运行,这意味着任务已经完成:

while (true) {
if (executor.getActiveCount() == 0) {
//ur own piece of code
break;
}
}

有点晚了,但为了完成…

不要“等待”所有的任务完成,你可以用好莱坞的原则来思考,“不要打电话给我,我会打电话给你”;-等我说完。 我认为结果代码更优雅…

番石榴提供了一些有趣的工具来实现这一点。

一个例子:

将ExecutorService包装成ListeningExecutorService:

ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

提交一个可调用对象的集合来执行::

for (Callable<Integer> callable : callables) {
ListenableFuture<Integer> lf = service.submit(callable);
// listenableFutures is a collection
listenableFutures.add(lf)
});

现在最重要的部分:

ListenableFuture<List<Integer>> lf = Futures.successfulAsList(listenableFutures);

附加一个回调到ListenableFuture,当所有future完成时,你可以使用它来得到通知:

Futures.addCallback(lf, new FutureCallback<List<Integer>> () {
@Override
public void onSuccess(List<Integer> result) {
// do something with all the results
}


@Override
public void onFailure(Throwable t) {
// log failure
}
});

这也提供了一个好处,一旦处理完成,您就可以在一个地方收集所有的结果……

更多信息在这里

这可能会有所帮助

Log.i(LOG_TAG, "shutting down executor...");
executor.shutdown();
while (true) {
try {
Log.i(LOG_TAG, "Waiting for executor to terminate...");
if (executor.isTerminated())
break;
if (executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException ignored) {}
}

只是在这里提供更多不同于使用闩锁/屏障的选择。 你也可以使用CompletionService得到部分结果,直到它们全部完成

From Java Concurrency in practice: “如果您有一批计算要提交给Executor,并且您希望检索它们的结果 可用时,您可以保留与每个任务关联的Future,并通过调用get来重复轮询完成 超时为0。这是可能的,但是单调乏味的。幸运的是,这里有更好的方法:一个完成服务。" < / p >

这里是实现

public class TaskSubmiter {
private final ExecutorService executor;
TaskSubmiter(ExecutorService executor) { this.executor = executor; }
void doSomethingLarge(AnySourceClass source) {
final List<InterestedResult> info = doPartialAsyncProcess(source);
CompletionService<PartialResult> completionService = new ExecutorCompletionService<PartialResult>(executor);
for (final InterestedResult interestedResultItem : info)
completionService.submit(new Callable<PartialResult>() {
public PartialResult call() {
return InterestedResult.doAnOperationToGetPartialResult();
}
});


try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<PartialResult> f = completionService.take();
PartialResult PartialResult = f.get();
processThisSegment(PartialResult);
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
catch (ExecutionException e) {
throw somethinghrowable(e.getCause());
}
}
}

遵循以下方法之一。

  1. 遍历所有未来任务,从submitExecutorService上返回,并根据Kiran的建议在Future对象上使用阻塞调用get()检查状态
  2. ExecutorService上使用invokeAll()
  3. CountDownLatch
  4. ForkJoinPoolExecutors.html # newWorkStealingPool
  5. 按照正确的顺序使用ThreadPoolExecutor的shutdown, awaitTermination, shutdownNow api

相关的SE问题:

CountDownLatch在Java多线程中是如何使用的?< / >

如何正确关闭java ExecutorService

你可以在这个跑步者类上调用waitTillDone ():

Runner runner = Runner.runner(4); // create pool with 4 threads in thread pool


while(...) {
runner.run(new MyTask()); // here you submit your task
}




runner.waitTillDone(); // and this blocks until all tasks are finished (or failed)




runner.shutdown(); // once you done you can shutdown the runner

你可以重用这个类,并在调用shutdown()之前调用waitTillDone()任意多次,加上你的代码是非常的简单。你也可以在前面不需要知道 任务数量

要使用它,只需将gradle/maven compile 'com.github.matejtymes:javafixes:1.3.1'依赖项添加到你的项目中。

详情请点击这里:

https://github.com/MatejTymes/JavaFixes

在Java8中,你可以用CompletableFuture来实现:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();

你应该使用executorService.shutdown()executorService.awaitTermination方法。

示例如下:

public class ScheduledThreadPoolExample {


public static void main(String[] args) throws InterruptedException {
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
executorService.scheduleAtFixedRate(() -> System.out.println("process task."),
0, 1, TimeUnit.SECONDS);


TimeUnit.SECONDS.sleep(10);
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.DAYS);
}


}

我们可以使用流API来处理流。请参阅下面的片段

final List<Runnable> tasks = ...; //or any other functional interface
tasks.stream().parallel().forEach(Runnable::run) // Uses default pool


//alternatively to specify parallelism
new ForkJoinPool(15).submit(
() -> tasks.stream().parallel().forEach(Runnable::run)
).get();

你可以使用下面的代码:

public class MyTask implements Runnable {


private CountDownLatch countDownLatch;


public MyTask(CountDownLatch countDownLatch {
this.countDownLatch = countDownLatch;
}


@Override
public void run() {
try {
//Do somethings
//
this.countDownLatch.countDown();//important
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}


CountDownLatch countDownLatch = new CountDownLatch(NUMBER_OF_TASKS);
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
for (int i = 0; i < NUMBER_OF_TASKS; i++){
taskExecutor.execute(new MyTask(countDownLatch));
}
countDownLatch.await();
System.out.println("Finish tasks");

所以我把我的答案贴在这里,以防有人想要一个更简单的方法来做到这一点

ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture[] futures = new CompletableFuture[10];
int i = 0;
while (...) {
futures[i++] =  CompletableFuture.runAsync(runner, executor);
}


CompletableFuture.allOf(futures).join(); // THis will wait until all future ready.

这是我的解决方案,基于“亚当·天行者”的技巧,它很有效

package frss.main;


import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class TestHilos {


void procesar() {
ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream().map(task -> CompletableFuture.runAsync(task, es)).toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();


System.out.println("FIN DEL PROCESO DE HILOS");
}


private List<Runnable> getTasks() {
List<Runnable> tasks = new ArrayList<Runnable>();


Hilo01 task1 = new Hilo01();
tasks.add(task1);


Hilo02 task2 = new Hilo02();
tasks.add(task2);
return tasks;
}


private class Hilo01 extends Thread {


@Override
public void run() {
System.out.println("HILO 1");
}


}


private class Hilo02 extends Thread {


@Override
public void run() {
try {
sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("HILO 2");
}


}




public static void main(String[] args) {
TestHilos test = new TestHilos();
test.procesar();
}
}

这里有两个选择,只是有点困惑,哪个是最好的。

选项1:

ExecutorService es = Executors.newFixedThreadPool(4);
List<Runnable> tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, es))
.toArray(CompletableFuture[]::new);
CompletableFuture.allOf(futures).join();
es.shutdown();

选项2:

ExecutorService es = Executors.newFixedThreadPool(4);
List< Future<?>> futures = new ArrayList<>();
for(Runnable task : taskList) {
futures.add(es.submit(task));
}


for(Future<?> future : futures) {
try {
future.get();
}catch(Exception e){
// do logging and nothing else
}
}
es.shutdown();

这里放入future.get();试抓是个好主意,对吧?

我创建了以下工作示例。其思想是有一种方法来处理具有许多线程(由numberOfTasks/阈值通过编程确定)的任务池(我使用队列作为示例),并等待所有线程完成后继续进行其他处理。

import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/** Testing CountDownLatch and ExecutorService to manage scenario where
* multiple Threads work together to complete tasks from a single
* resource provider, so the processing can be faster. */
public class ThreadCountDown {


private CountDownLatch threadsCountdown = null;
private static Queue<Integer> tasks = new PriorityQueue<>();


public static void main(String[] args) {
// Create a queue with "Tasks"
int numberOfTasks = 2000;
while(numberOfTasks-- > 0) {
tasks.add(numberOfTasks);
}


// Initiate Processing of Tasks
ThreadCountDown main = new ThreadCountDown();
main.process(tasks);
}


/* Receiving the Tasks to process, and creating multiple Threads
* to process in parallel. */
private void process(Queue<Integer> tasks) {
int numberOfThreads = getNumberOfThreadsRequired(tasks.size());
threadsCountdown = new CountDownLatch(numberOfThreads);
ExecutorService threadExecutor = Executors.newFixedThreadPool(numberOfThreads);


//Initialize each Thread
while(numberOfThreads-- > 0) {
System.out.println("Initializing Thread: "+numberOfThreads);
threadExecutor.execute(new MyThread("Thread "+numberOfThreads));
}


try {
//Shutdown the Executor, so it cannot receive more Threads.
threadExecutor.shutdown();
threadsCountdown.await();
System.out.println("ALL THREADS COMPLETED!");
//continue With Some Other Process Here
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}


/* Determine the number of Threads to create */
private int getNumberOfThreadsRequired(int size) {
int threshold = 100;
int threads = size / threshold;
if( size > (threads*threshold) ){
threads++;
}
return threads;
}


/* Task Provider. All Threads will get their task from here */
private synchronized static Integer getTask(){
return tasks.poll();
}


/* The Threads will get Tasks and process them, while still available.
* When no more tasks available, the thread will complete and reduce the threadsCountdown */
private class MyThread implements Runnable {


private String threadName;


protected MyThread(String threadName) {
super();
this.threadName = threadName;
}


@Override
public void run() {
Integer task;
try{
//Check in the Task pool if anything pending to process
while( (task = getTask()) != null ){
processTask(task);
}
}catch (Exception ex){
ex.printStackTrace();
}finally {
/*Reduce count when no more tasks to process. Eventually all
Threads will end-up here, reducing the count to 0, allowing
the flow to continue after threadsCountdown.await(); */
threadsCountdown.countDown();
}
}


private void processTask(Integer task){
try{
System.out.println(this.threadName+" is Working on Task: "+ task);
}catch (Exception ex){
ex.printStackTrace();
}
}
}
}

希望能有所帮助!


ExecutorService WORKER_THREAD_POOL
= Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
WORKER_THREAD_POOL.submit(() -> {
try {
// doSomething();
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}


// wait for the latch to be decremented by the two remaining threads
latch.await();

如果doSomething()抛出一些其他异常,latch.countDown()似乎不会执行,那么我应该怎么做?

如果你连续使用了更多的ExecutionServices线程,并且想要等待每个EXECUTIONSERVICE完成。最好的办法是像下面这样;

ExecutorService executer1 = Executors.newFixedThreadPool(THREAD_SIZE1);
for (<loop>) {
executer1.execute(new Runnable() {
@Override
public void run() {
...
}
});
}
executer1.shutdown();


try{
executer1.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);


ExecutorService executer2 = Executors.newFixedThreadPool(THREAD_SIZE2);
for (true) {
executer2.execute(new Runnable() {
@Override
public void run() {
...
}
});
}
executer2.shutdown();
} catch (Exception e){
...
}

使用ExecutorService的干净方式

 List<Future<Void>> results = null;
try {
List<Callable<Void>> tasks = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(4);
results = executorService.invokeAll(tasks);
} catch (InterruptedException ex) {
...
} catch (Exception ex) {
...
}

项目织机AutoCloseable执行器服务上Try-with-Resources语法

项目织机< em > < / em >寻求为Java中的并发能力添加新特性。

其中一个特性是使ExecutorService AutoCloseable. conf。这意味着每个ExecutorService实现都将提供一个close方法。这意味着我们可以使用try-with-resources语法自动关闭ExecutorService对象。

ExecutorService#close方法会阻塞,直到所有提交的任务都完成。使用close代替调用shutdown &awaitTermination

作为AutoCloseable有助于Project Loom将“结构化并发性”引入Java的尝试。

try (
ExecutorService executorService = Executors.… ;
) {
// Submit your `Runnable`/`Callable` tasks to the executor service.
…
}
// At this point, flow-of-control blocks until all submitted tasks are done/canceled/failed.
// After this point, the executor service will have been automatically shutdown, wia `close` method called by try-with-resources syntax.

有关Project Loom的更多信息,请搜索由Ron Pressler和Project Loom团队的其他人所做的演讲和采访。关注更近期的,如Project Loom的发展。

Project Loom技术的实验版本是现在有时间,基于早期访问的Java 18