如何等待多个线程完成?

简单地等待所有线程进程完成的方法是什么? 例如,假设我有:

public class DoSomethingInAThread implements Runnable{


public static void main(String[] args) {
for (int n=0; n<1000; n++) {
Thread t = new Thread(new DoSomethingInAThread());
t.start();
}
// wait for all threads' run() methods to complete before continuing
}


public void run() {
// do something here
}




}

我如何改变这一点,使 main()方法暂停在注释,直到所有线程的 run()方法退出? 谢谢!

181120 次浏览

You can do it with the Object “ ThreadGroup”及其参数 activeCount:

如果列出线程的列表,则可以循环访问它们,然后。Join () ,当所有线程都具有。我还没试过。

Http://docs.oracle.com/javase/8/docs/api/java/lang/thread.html#join

将所有线程放入一个数组中,启动它们,然后执行一个循环

for(i = 0; i < threads.length; i++)
threads[i].join();

每个连接都将阻塞,直到相应的线程完成。线程的完成顺序可能与您加入它们的顺序不同,但这不是问题: 当循环退出时,所有线程都已完成。

One way would be to make a List of Threads, create and launch each thread, while adding it to the list. Once everything is launched, loop back through the list and call join() on each one. It doesn't matter what order the threads finish executing in, all you need to know is that by the time that second loop finishes executing, every thread will have completed.

更好的方法是使用 执行服务及其相关方法:

List<Callable> callables = ... // assemble list of Callables here
// Like Runnable but can return a value
ExecutorService execSvc = Executors.newCachedThreadPool();
List<Future<?>> results = execSvc.invokeAll(callables);
// Note: You may not care about the return values, in which case don't
//       bother saving them

使用 ExecutorService (以及来自 Java5的 并发实用程序的所有新内容)是非常灵活的,而上面的示例几乎只是触及皮毛。

完全避免使用 Thread 类,而是使用 java.util.current 中提供的更高级的抽象

ExecutorService 类提供了 方法 invkeAll,它似乎可以完成您想要的任务。

根据您的需要,您可能还需要检查 java.util.conconconclage 包中的 CountDownLatch 和 CyclicBarris 类。如果希望线程相互等待,或者希望对线程的执行方式进行更细粒度的控制(例如,在线程的内部执行中等待另一个线程设置某种状态) ,它们可能非常有用。您还可以使用 CountDownLatch 向所有线程发出同时启动的信号,而不是在循环中逐个启动它们。标准的 API 文档就是一个例子,另外还使用另一个 CountDownLatch 等待所有线程完成执行。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;


public class DoSomethingInAThread implements Runnable
{
public static void main(String[] args) throws ExecutionException, InterruptedException
{
//limit the number of actual threads
int poolSize = 10;
ExecutorService service = Executors.newFixedThreadPool(poolSize);
List<Future<Runnable>> futures = new ArrayList<Future<Runnable>>();


for (int n = 0; n < 1000; n++)
{
Future f = service.submit(new DoSomethingInAThread());
futures.add(f);
}


// wait for all tasks to complete before continuing
for (Future<Runnable> f : futures)
{
f.get();
}


//shut down the executor service so that this thread can exit
service.shutdownNow();
}


public void run()
{
// do something here
}
}

Create the thread object inside the first for loop.

for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
public void run() {
// some code to run in parallel
}
});
threads[i].start();
}

然后这里的每个人都在说。

for(i = 0; i < threads.length; i++)
threads[i].join();

考虑在 Javadocs中使用 java.util.concurrent.CountDownLatch.example

正如 Martin K 所建议的,java.util.concurrent.CountDownLatch似乎是一个更好的解决方案。只是添加了一个相同的例子

     public class CountDownLatchDemo
{


public static void main (String[] args)
{
int noOfThreads = 5;
// Declare the count down latch based on the number of threads you need
// to wait on
final CountDownLatch executionCompleted = new CountDownLatch(noOfThreads);
for (int i = 0; i < noOfThreads; i++)
{
new Thread()
{


@Override
public void run ()
{


System.out.println("I am executed by :" + Thread.currentThread().getName());
try
{
// Dummy sleep
Thread.sleep(3000);
// One thread has completed its job
executionCompleted.countDown();
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}


}.start();
}


try
{
// Wait till the count down latch opens.In the given case till five
// times countDown method is invoked
executionCompleted.await();
System.out.println("All over");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}


}

可以使用 CountDownLatch代替旧的 API join()。为了满足您的要求,我已经修改了您的代码如下。

import java.util.concurrent.*;
class DoSomethingInAThread implements Runnable{
CountDownLatch latch;
public DoSomethingInAThread(CountDownLatch latch){
this.latch = latch;
}
public void run() {
try{
System.out.println("Do some thing");
latch.countDown();
}catch(Exception err){
err.printStackTrace();
}
}
}


public class CountDownLatchDemo {
public static void main(String[] args) {
try{
CountDownLatch latch = new CountDownLatch(1000);
for (int n=0; n<1000; n++) {
Thread t = new Thread(new DoSomethingInAThread(latch));
t.start();
}
latch.await();
System.out.println("In Main thread after completion of 1000 threads");
}catch(Exception err){
err.printStackTrace();
}
}
}

解说 :

  1. 根据您的要求,CountDownLatch已经初始化为给定的计数1000。

  2. 每个工作线程 DoSomethingInAThread将减少已经在构造函数中传递的 CountDownLatch

  3. 主线程 CountDownLatchDemo await()直到计数为零。一旦计数为零,您将得到输出中的下一行。

    In Main thread after completion of 1000 threads
    

More info from oracle documentation page

public void await()
throws InterruptedException

使当前线程等待,直到锁存器倒计时为零,除非线程被中断。

其他选择请参考相关的 SE 问题:

等待所有线程在 Java 中完成它们的工作

作为 CountDownLatch的替代品,你也可以使用 循环屏障

public class ThreadWaitEx {
static CyclicBarrier barrier = new CyclicBarrier(100, new Runnable(){
public void run(){
System.out.println("clean up job after all tasks are done.");
}
});
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
Thread t = new Thread(new MyCallable(barrier));
t.start();
}
}


}


class MyCallable implements Runnable{
private CyclicBarrier b = null;
public MyCallable(CyclicBarrier b){
this.b = b;
}
@Override
public void run(){
try {
//do something
System.out.println(Thread.currentThread().getName()+" is waiting for barrier after completing his job.");
b.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

在这种情况下,要使用 CyclicBarrier.wait ()应该是最后一个语句,也就是说,当线程完成它的工作时。CyclicBlock 可以通过其复位()方法再次使用。引用 javadocs 的话:

CyclicBlock 支持一个可选的 Runnable 命令,该命令在团队中的最后一个线程到达之后,但在任何线程被释放之前,每个障碍点运行一次。这个屏障操作对于在任何一方继续之前更新共享状态是有用的。

ABc0对我没有帮助,看看 Kotlin 的这个样本:

    val timeInMillis = System.currentTimeMillis()
ThreadUtils.startNewThread(Runnable {
for (i in 1..5) {
val t = Thread(Runnable {
Thread.sleep(50)
var a = i
kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
Thread.sleep(200)
for (j in 1..5) {
a *= j
Thread.sleep(100)
kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
}
kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
})
t.start()
}
})

结果是:

Thread-5|a=5
Thread-1|a=1
Thread-3|a=3
Thread-2|a=2
Thread-4|a=4
Thread-2|2*1=2
Thread-3|3*1=3
Thread-1|1*1=1
Thread-5|5*1=5
Thread-4|4*1=4
Thread-1|2*2=2
Thread-5|10*2=10
Thread-3|6*2=6
Thread-4|8*2=8
Thread-2|4*2=4
Thread-3|18*3=18
Thread-1|6*3=6
Thread-5|30*3=30
Thread-2|12*3=12
Thread-4|24*3=24
Thread-4|96*4=96
Thread-2|48*4=48
Thread-5|120*4=120
Thread-1|24*4=24
Thread-3|72*4=72
Thread-5|600*5=600
Thread-4|480*5=480
Thread-3|360*5=360
Thread-1|120*5=120
Thread-2|240*5=240
Thread-1|TaskDurationInMillis = 765
Thread-3|TaskDurationInMillis = 765
Thread-4|TaskDurationInMillis = 765
Thread-5|TaskDurationInMillis = 765
Thread-2|TaskDurationInMillis = 765

现在让我使用 join()来处理线程:

    val timeInMillis = System.currentTimeMillis()
ThreadUtils.startNewThread(Runnable {
for (i in 1..5) {
val t = Thread(Runnable {
Thread.sleep(50)
var a = i
kotlin.io.println(Thread.currentThread().name + "|" + "a=$a")
Thread.sleep(200)
for (j in 1..5) {
a *= j
Thread.sleep(100)
kotlin.io.println(Thread.currentThread().name + "|" + "$a*$j=$a")
}
kotlin.io.println(Thread.currentThread().name + "|TaskDurationInMillis = " + (System.currentTimeMillis() - timeInMillis))
})
t.start()
t.join()
}
})

结果就是:

Thread-1|a=1
Thread-1|1*1=1
Thread-1|2*2=2
Thread-1|6*3=6
Thread-1|24*4=24
Thread-1|120*5=120
Thread-1|TaskDurationInMillis = 815
Thread-2|a=2
Thread-2|2*1=2
Thread-2|4*2=4
Thread-2|12*3=12
Thread-2|48*4=48
Thread-2|240*5=240
Thread-2|TaskDurationInMillis = 1568
Thread-3|a=3
Thread-3|3*1=3
Thread-3|6*2=6
Thread-3|18*3=18
Thread-3|72*4=72
Thread-3|360*5=360
Thread-3|TaskDurationInMillis = 2323
Thread-4|a=4
Thread-4|4*1=4
Thread-4|8*2=8
Thread-4|24*3=24
Thread-4|96*4=96
Thread-4|480*5=480
Thread-4|TaskDurationInMillis = 3078
Thread-5|a=5
Thread-5|5*1=5
Thread-5|10*2=10
Thread-5|30*3=30
Thread-5|120*4=120
Thread-5|600*5=600
Thread-5|TaskDurationInMillis = 3833

当我们使用 join的时候,很明显:

  1. 线程按顺序运行。
  2. 第一个样本需要765毫秒,而第二个样本需要3833毫秒。

我们防止阻塞其他线程的解决方案是创建一个 ArrayList:

val threads = ArrayList<Thread>()

现在,当我们想要启动一个新线程时,我们最常把它添加到 ArrayList:

addThreadToArray(
ThreadUtils.startNewThread(Runnable {
...
})
)

addThreadToArray功能:

@Synchronized
fun addThreadToArray(th: Thread) {
threads.add(th)
}

startNewThread的功能:

fun startNewThread(runnable: Runnable) : Thread {
val th = Thread(runnable)
th.isDaemon = false
th.priority = Thread.MAX_PRIORITY
th.start()
return th
}

检查所有需要的线程的完成情况如下:

val notAliveThreads = ArrayList<Thread>()
for (t in threads)
if (!t.isAlive)
notAliveThreads.add(t)
threads.removeAll(notAliveThreads)
if (threads.size == 0){
// The size is 0 -> there is no alive threads.
}

问题在于:

for(i = 0; i < threads.length; i++)
threads[i].join();

就是 threads[i + 1]永远不能在 threads[i]之前加入。 除了“锁定”的,所有的解决方案都缺乏这一点。

这里还没有人提到 ExecutorCompletionService,它允许根据线程/任务的完成顺序进行连接:

public class ExecutorCompletionService<V> extends Object implements CompletionService<V>

使用提供的 Executor执行任务的 CompletionService。这个类安排提交的任务在完成后放在一个可以使用 take 访问的队列中。该类足够轻量级,适合在处理任务组时暂时使用。

使用示例。

假设您有一组解决某个问题的解决方案,每个解决方案返回一个类型为 Result 的值,并希望在某个方法 use(Result r)中并发运行它们,处理每个解决方案返回非空值的结果。你可以这样写:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException {
CompletionService<Result> cs = new ExecutorCompletionService<>(e);
solvers.forEach(cs::submit);
for (int i = solvers.size(); i > 0; i--) {
Result r = cs.take().get();
if (r != null)
use(r);
}
}

相反,假设您希望使用任务集的第一个非空结果,忽略任何遇到异常的任务,并在第一个任务准备好时取消所有其他任务:

void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> cs = new ExecutorCompletionService<>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<>(n);
Result result = null;
try {
solvers.forEach(solver -> futures.add(cs.submit(solver)));
for (int i = n; i > 0; i--) {
try {
Result r = cs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {}
}
} finally {
futures.forEach(future -> future.cancel(true));
}


if (result != null)
use(result);
}

Since: 1.5 (!)

Assuming use(r) (of Example 1) also asynchronous, we had a big advantage. #