等待所有线程在 Java 中完成它们的工作

我正在编写一个应用程序,它有5个线程,可以同时从 Web 获取一些信息,并在一个缓冲类中填充5个不同的字段。
我需要验证缓冲区数据,并在所有线程完成其工作时将其存储在数据库中。
我如何做到这一点(当所有线程完成其工作时收到警报) ?

216128 次浏览

You can join to the threads. The join blocks until the thread completes.

for (Thread thread : threads) {
thread.join();
}

请注意,join抛出一个 InterruptedException。如果发生这种情况,您必须决定要做什么(例如,尝试取消其他线程以防止完成不必要的工作)。

你有

for (Thread t : new Thread[] { th1, th2, th3, th4, th5 })
t.join()

在这个 for 循环之后,可以确保所有线程都完成了它们的作业。

可以为此使用 Threadf # join方法。

将线程对象存储到某个集合中(如 List 或 Set) ,然后在线程启动后循环遍历该集合,并在线程上调用 加入()

执行器服务可用于管理包括状态和完成在内的多个线程

除了其他人建议的 Thread.join()之外,java 5还引入了执行器框架。在那里您不能使用 Thread对象。相反,您将 CallableRunnable对象提交给执行者。有一个特殊的执行器,用于执行多个任务,并按顺序返回结果。这就是 ExecutorCompletionService:

ExecutorCompletionService executor;
for (..) {
executor.submit(Executors.callable(yourRunnable));
}

然后,您可以重复调用 take(),直到没有更多的 Future<?>对象要返回,这意味着所有这些对象都已完成。


另一件可能相关的事情,取决于您的场景是 CyclicBarrier

一个同步辅助工具,允许一组线程相互等待对方到达一个公共屏障点。CyclicBarriers 在涉及固定大小的线程的程序中很有用,这些线程偶尔必须相互等待。这个屏障被称为循环屏障,因为它可以在释放等待的线程之后重用。

我采用的方法是使用 执行服务来管理线程池。

ExecutorService es = Executors.newCachedThreadPool();
for(int i=0;i<5;i++)
es.execute(new Runnable() { /*  your task */ });
es.shutdown();
boolean finished = es.awaitTermination(1, TimeUnit.MINUTES);
// all tasks have finished or the time has been reached.

另一种可能性是 CountDownLatch对象,这对于简单的情况很有用: 因为您事先知道线程的数量,所以可以使用相关的计数对其进行初始化,并将对象的引用传递给每个线程。
完成任务后,每个线程调用 CountDownLatch.countDown(),从而减少内部计数器。在启动所有其他线程之后,主线程应该执行 CountDownLatch.await()阻塞调用。一旦内部计数器达到0,它就会被释放。

请注意,使用这个对象,也可以抛出 InterruptedException

虽然与 OP 的问题无关,但是如果您对与一个线程的同步(更确切地说,是一个约会)感兴趣,那么您可以使用 交换器

在我的例子中,我需要暂停父线程,直到子线程完成某些操作,例如完成初始化。CountDownLatch也可以很好地工作。

试试这个,会有用的。

  Thread[] threads = new Thread[10];


List<Thread> allThreads = new ArrayList<Thread>();


for(Thread thread : threads){


if(null != thread){


if(thread.isAlive()){


allThreads.add(thread);


}


}


}


while(!allThreads.isEmpty()){


Iterator<Thread> ite = allThreads.iterator();


while(ite.hasNext()){


Thread thread = ite.next();


if(!thread.isAlive()){


ite.remove();
}


}


}

在你的主线程中使用它: while (! Executor.isTerminated ()) ; 在从执行器服务启动所有线程之后放置这行代码。这只会在执行程序启动的所有线程完成之后才启动主线程。请确保在上面的循环之前调用 Executor.close () ;。

看看各种解决方案。

  1. 在 Java 的早期版本中已经引入了 join()API。自 JDK 1.5发布以来,这个 同时包提供了一些不错的替代方案。

  2. ExecutorService # invekAll ()

    执行给定的任务,当所有事情都完成时返回一个持有其状态和结果的期货列表。

    请参考这个相关的 SE 问题作为代码示例:

    如何使用 invekAll ()让所有线程池执行它们的任务?

  3. CountDownLatch

    允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

    使用给定的计数初始化 CountDownLatch。由于调用 countDown()方法,等待方法会一直阻塞,直到当前计数达到零为止,然后释放所有等待线程,并立即释放任何后续等待返回的调用。这是一个一次性现象——计数无法重置。如果需要重置计数的版本,可以考虑使用 循环屏障

    有关 CountDownLatch的用法,请参考这个问题

    如何等待一个线程,产生它自己的线程?

  4. 执行者中的 ForkJoinPool 或 NewWorkStealingPool ()

  5. 遍历在提交到 ExecutorService之后创建的所有 未来对象

我也遇到过类似的问题,最终使用了 Java8并行流。

requestList.parallelStream().forEach(req -> makeRequest(req));

超级简单易读。 在幕后,它使用默认 JVM 的 fork 连接池,这意味着它将等待所有线程完成后才继续。对我来说,这是一个简洁的解决方案,因为它是我的应用程序中唯一的并行流。如果您有多个并行流同时运行,请阅读下面的链接。

More information about parallel streams 给你.

现有的答案说可以 join()每个线程。

但是有几种获得线程数组/列表的方法:

  • 在创建时将线程添加到列表中。
  • 使用 ThreadGroup管理线程。

下面的代码将使用 ThreadGruop方法。它首先创建一个组,然后在创建每个线程时指定构造函数中的组,之后可以通过 ThreadGroup.enumerate()获取线程数组


密码

SyncBlockLearn.java

import org.testng.Assert;
import org.testng.annotations.Test;


/**
* synchronized block - learn,
*
* @author eric
* @date Apr 20, 2015 1:37:11 PM
*/
public class SyncBlockLearn {
private static final int TD_COUNT = 5; // thread count
private static final int ROUND_PER_THREAD = 100; // round for each thread,
private static final long INC_DELAY = 10; // delay of each increase,


// sync block test,
@Test
public void syncBlockTest() throws InterruptedException {
Counter ct = new Counter();
ThreadGroup tg = new ThreadGroup("runner");


for (int i = 0; i < TD_COUNT; i++) {
new Thread(tg, ct, "t-" + i).start();
}


Thread[] tArr = new Thread[TD_COUNT];
tg.enumerate(tArr); // get threads,


// wait all runner to finish,
for (Thread t : tArr) {
t.join();
}


System.out.printf("\nfinal count: %d\n", ct.getCount());
Assert.assertEquals(ct.getCount(), TD_COUNT * ROUND_PER_THREAD);
}


static class Counter implements Runnable {
private final Object lkOn = new Object(); // the object to lock on,
private int count = 0;


@Override
public void run() {
System.out.printf("[%s] begin\n", Thread.currentThread().getName());


for (int i = 0; i < ROUND_PER_THREAD; i++) {
synchronized (lkOn) {
System.out.printf("[%s] [%d] inc to: %d\n", Thread.currentThread().getName(), i, ++count);
}
try {
Thread.sleep(INC_DELAY); // wait a while,
} catch (InterruptedException e) {
e.printStackTrace();
}
}


System.out.printf("[%s] end\n", Thread.currentThread().getName());
}


public int getCount() {
return count;
}
}
}

主线程将等待组中的所有线程完成。

我创建了一个小型 helper 方法来等待几个 Threads 完成:

public static void waitForThreadsToFinish(Thread... threads) {
try {
for (Thread thread : threads) {
thread.join();
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
}

Wait/block the Thread Main until some other threads complete their work.

正如 @Ravindra babu所说,它可以通过多种方式实现,但是要用实例来说明。

  • Lang. Thread.加入() < sup > 从1.0开始

    public static void joiningThreads() throws InterruptedException {
    Thread t1 = new Thread( new LatchTask(1, null), "T1" );
    Thread t2 = new Thread( new LatchTask(7, null), "T2" );
    Thread t3 = new Thread( new LatchTask(5, null), "T3" );
    Thread t4 = new Thread( new LatchTask(2, null), "T4" );
    
    
    // Start all the threads
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    
    
    // Wait till all threads completes
    t1.join();
    t2.join();
    t3.join();
    t4.join();
    }
    
  • java.util.concurrent.CountDownLatch Since:1.5

    • .countDown() « Decrements the count of the latch group.
    • .await() « The await methods block until the current count reaches zero.

    If you created latchGroupCount = 4 then countDown() should be called 4 times to make count 0. So, that await() will release the blocking threads.

    public static void latchThreads() throws InterruptedException {
    int latchGroupCount = 4;
    CountDownLatch latch = new CountDownLatch(latchGroupCount);
    Thread t1 = new Thread( new LatchTask(1, latch), "T1" );
    Thread t2 = new Thread( new LatchTask(7, latch), "T2" );
    Thread t3 = new Thread( new LatchTask(5, latch), "T3" );
    Thread t4 = new Thread( new LatchTask(2, latch), "T4" );
    
    
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    
    
    //latch.countDown();
    
    
    latch.await(); // block until latchGroupCount is 0.
    }
    

Example code of Threaded class LatchTask. To test the approach use joiningThreads(); and latchThreads(); from main method.

class LatchTask extends Thread {
CountDownLatch latch;
int iterations = 10;
public LatchTask(int iterations, CountDownLatch latch) {
this.iterations = iterations;
this.latch = latch;
}


@Override
public void run() {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " : Started Task...");


for (int i = 0; i < iterations; i++) {
System.out.println(threadName + " : " + i);
MainThread_Wait_TillWorkerThreadsComplete.sleep(1);
}
System.out.println(threadName + " : Completed Task");
// countDown() « Decrements the count of the latch group.
if(latch != null)
latch.countDown();
}
}
  • CyclicBarriers 一个同步辅助工具,允许一组线程相互等待到达一个公共屏障点。CyclicBarriers 在涉及固定大小的线程的程序中很有用,这些线程偶尔必须相互等待。这个屏障被称为循环屏障,因为它可以在释放等待的线程之后重用。
    CyclicBarrier barrier = new CyclicBarrier(3);
    barrier.await();
    
    例如,引用这个 Concurrent_ParallelNotifyies类。

  • Executer framework: we can use ExecutorService to create a thread pool, and tracks the progress of the asynchronous tasks with Future.

    • 返回 Future Object 的 submit(Runnable)submit(Callable)。通过使用 future.get()函数,我们可以阻塞主线程,直到工作线程完成其工作。

    • 返回 Future 对象的列表,通过该列表可以获得每个 Callable 的执行结果。

查找使用 Interfaces Runnable,Callable with Execator Framework 的示例


@See also

我有类似的情况,我必须等待,直到所有子线程完成其执行,然后只有我可以得到每个状态的结果。.因此,我需要等待,直到所有子线程完成。

下面是我的代码,我做了多线程使用

 public static void main(String[] args) {
List<RunnerPojo> testList = ExcelObject.getTestStepsList();//.parallelStream().collect(Collectors.toList());
int threadCount = ConfigFileReader.getInstance().readConfig().getParallelThreadCount();
System.out.println("Thread count is : =========  " + threadCount); // 5


ExecutorService threadExecutor = new DriverScript().threadExecutor(testList, threadCount);




boolean isProcessCompleted = waitUntilCondition(() -> threadExecutor.isTerminated()); // Here i used waitUntil condition


if (isProcessCompleted) {
testList.forEach(x -> {
System.out.println("Test Name: " + x.getTestCaseId());
System.out.println("Test Status : " + x.getStatus());
System.out.println("======= Test Steps ===== ");


x.getTestStepsList().forEach(y -> {
System.out.println("Step Name: " + y.getDescription());
System.out.println("Test caseId : " + y.getTestCaseId());
System.out.println("Step Status: " + y.getResult());
System.out.println("\n ============ ==========");
});
});
}

下面的方法用于并行处理的列表分配

// This method will split my list and run in a parallel process with mutliple threads
private ExecutorService threadExecutor(List<RunnerPojo> testList, int threadSize) {
ExecutorService exec = Executors.newFixedThreadPool(threadSize);
testList.forEach(tests -> {
exec.submit(() -> {
driverScript(tests);
});
});
exec.shutdown();
return exec;
}

这是我的 wait until 方法: 在这里您可以等待,直到您的条件在 do while 循环中满足。对我来说,我等待的是最大限度的暂停。 这将不断检查,直到你的 threadExecutor.isTerminated()true与轮询期5秒。

    static boolean waitUntilCondition(Supplier<Boolean> function) {
Double timer = 0.0;
Double maxTimeOut = 20.0;


boolean isFound;
do {
isFound = function.get();
if (isFound) {
break;
} else {
try {
Thread.sleep(5000); // Sleeping for 5 sec (main thread will sleep for 5 sec)
} catch (InterruptedException e) {
e.printStackTrace();
}
timer++;
System.out.println("Waiting for condition to be true .. waited .." + timer * 5 + " sec.");
}
} while (timer < maxTimeOut + 1.0);


return isFound;
}