CountDownLatch是如何在Java多线程中使用的?

有人能帮助我理解什么是Java CountDownLatch和什么时候使用它吗?

我不太清楚这个项目是怎么运作的。据我所知,这三个线程同时开始,每个线程将在3000ms后调用CountDownLatch。所以倒数会一个一个递减。锁存变为零后,程序打印“完成”。也许我理解的方式是错误的。

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


class Processor implements Runnable {
private CountDownLatch latch;


public Processor(CountDownLatch latch) {
this.latch = latch;
}


public void run() {
System.out.println("Started.");


try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}


latch.countDown();
}
}

// -----------------------------------------------------

public class App {


public static void main(String[] args) {


CountDownLatch latch = new CountDownLatch(3); // coundown from 3 to 0


ExecutorService executor = Executors.newFixedThreadPool(3); // 3 Threads in pool


for(int i=0; i < 3; i++) {
executor.submit(new Processor(latch)); // ref to latch. each time call new Processes latch will count down by 1
}


try {
latch.await();  // wait until latch counted down to 0
} catch (InterruptedException e) {
e.printStackTrace();
}


System.out.println("Completed.");
}


}
138694 次浏览

是的,你理解正确。 CountDownLatch工作在闩锁原理,主线程将等待,直到门打开。一个线程等待n线程,该线程是在创建CountDownLatch.

. 0时指定的

任何调用CountDownLatch.await()的线程(通常是应用程序的主线程)都将等待count为零或被另一个线程中断。所有其他线程都需要在完成或准备就绪后调用CountDownLatch.countDown()进行倒数。

只要count达到0,等待线程就会继续。CountDownLatch的缺点/优点之一是它不可重用:一旦count达到0,你就不能再使用CountDownLatch了。

编辑:

当一个线程(如主线程)需要等待一个或多个线程完成后才能继续处理时,使用CountDownLatch

在Java中使用CountDownLatch的一个经典例子是一个服务器端核心Java应用程序,它使用服务架构,其中多个服务由多个线程提供,应用程序只有在所有服务都成功启动后才能开始处理。

< p >注: OP的问题有一个非常简单的例子,所以我没有包括一个

NikolaB解释得很好,但是举例有助于理解,所以这里有一个简单的例子…

 import java.util.concurrent.*;




public class CountDownLatchExample {


public static class ProcessThread implements Runnable {


CountDownLatch latch;
long workDuration;
String name;


public ProcessThread(String name, CountDownLatch latch, long duration){
this.name= name;
this.latch = latch;
this.workDuration = duration;
}




public void run() {
try {
System.out.println(name +" Processing Something for "+ workDuration/1000 + " Seconds");
Thread.sleep(workDuration);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(name+ "completed its works");
//when task finished.. count down the latch count...


// basically this is same as calling lock object notify(), and object here is latch
latch.countDown();
}
}




public static void main(String[] args) {
// Parent thread creating a latch object
CountDownLatch latch = new CountDownLatch(3);


new Thread(new ProcessThread("Worker1",latch, 2000)).start(); // time in millis.. 2 secs
new Thread(new ProcessThread("Worker2",latch, 6000)).start();//6 secs
new Thread(new ProcessThread("Worker3",latch, 4000)).start();//4 secs




System.out.println("waiting for Children processes to complete....");
try {
//current thread will get notified if all chidren's are done
// and thread will resume from wait() mode.
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}


System.out.println("All Process Completed....");


System.out.println("Parent Thread Resuming work....");






}
}

在Java中,CountDownLatch是一种同步器,它允许一个Thread在开始处理之前等待一个或多个Thread

CountDownLatch工作在闩锁原理,线程将等待,直到门打开。一个线程等待创建CountDownLatch时指定的n线程数。

例如final CountDownLatch latch = new CountDownLatch(3);

这里我们将计数器设置为3。

任何调用CountDownLatch.await()的线程(通常是应用程序主线程)都将等待count为零或被另一个Thread中断。所有其他线程都需要在完成或准备工作时调用CountDownLatch.countDown()来进行倒数。一旦count达到0,等待的Thread就开始运行。

这里的计数是通过CountDownLatch.countDown()方法递减的。

调用await()方法的Thread将一直等待,直到初始计数为零。

要使计数为0,其他线程需要调用countDown()方法。 一旦计数为零,调用await()方法的线程将恢复(开始执行)

CountDownLatch的缺点是它不可重用:一旦计数为零,它就不再可用。

CoundDownLatch使您能够让一个线程等待所有其他线程执行完毕。

伪代码可以是:

// Main thread starts
// Create CountDownLatch for N threads
// Create and start N threads
// Main thread waits on latch
// N threads completes there tasks are returns
// Main thread resume execution

一个很好的例子是使用Java Simple Serial Connector访问串行端口。通常情况下,您将向端口写入一些内容,然后异步地在另一个线程上,设备将对SerialPortEventListener进行响应。通常,您会希望在写入端口后暂停以等待响应。手动处理此场景的线程锁非常棘手,但是使用Countdownlatch很容易。在你认为你可以用另一种方式去做之前,要小心你从未想过的竞争条件!!

伪代码:

< >之前

CountDownLatch latch;
void writeData() {
latch = new CountDownLatch(1);
serialPort.writeBytes(sb.toString().getBytes())
try {
latch.await(4, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}
}
class SerialPortReader implements SerialPortEventListener {
public void serialEvent(SerialPortEvent event) {
if(event.isRXCHAR()){//If data is available
byte buffer[] = serialPort.readBytes(event.getEventValue());
latch.countDown();
}
}
}

< / >之前

当我们想要等待多个线程完成其任务时,就会使用它。它类似于线程连接。

我们可以在哪里使用CountDownLatch

考虑这样一个场景,我们有一个需求,我们有三个线程“a”,“b”;和“;C"我们想开始线程"C"只有当"和“;B"线程完成或部分完成它们的任务。

它可以应用于真实的It场景

考虑这样一个场景,经理将模块划分到开发团队(a和B)之间,他希望仅在两个团队都完成任务时将模块分配给QA团队进行测试。

public class Manager {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
MyDevTeam teamDevA = new MyDevTeam(countDownLatch, "devA");
MyDevTeam teamDevB = new MyDevTeam(countDownLatch, "devB");
teamDevA.start();
teamDevB.start();
countDownLatch.await();
MyQATeam qa = new MyQATeam();
qa.start();
}
}


class MyDevTeam extends Thread {
CountDownLatch countDownLatch;
public MyDevTeam (CountDownLatch countDownLatch, String name) {
super(name);
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
System.out.println("Task assigned to development team " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("Task finished by development team " + Thread.currentThread().getName());
this.countDownLatch.countDown();
}
}


class MyQATeam extends Thread {
@Override
public void run() {
System.out.println("Task assigned to QA team");
try {
Thread.sleep(2000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
System.out.println("Task finished by QA team");
}
}

以上代码的输出为:

分配给开发团队devB的任务

分配给开发团队devA的任务

任务由开发团队devB完成

由开发团队devA完成的任务

分配给QA团队的任务

QA团队完成的任务

这里等待()方法等待countdownlatch标志变为0,而倒计时()方法将countdownlatch标志减1。

JOIN限制: 上面的例子也可以通过JOIN实现,但是JOIN不能在两个场景中使用:

  1. 当我们使用ExecutorService而不是Thread类创建线程时。
  2. 修改上面的例子,经理希望在开发完成80%的任务后立即将代码移交给QA团队。这意味着CountDownLatch允许我们修改实现,可以用来等待另一个线程的部分执行。

如果你在调用latch.countDown()后添加一些调试,这可能会帮助你更好地理解它的行为。

latch.countDown();
System.out.println("DONE "+this.latch); // Add this debug

输出将显示Count正在递减。这个“计数”实际上是你已经启动的可运行任务(处理器对象)的数量,其中countDown()已经被调用,因此在主线程调用latch.await()时被阻塞。

DONE java.util.concurrent.CountDownLatch@70e69696[Count = 2]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 1]
DONE java.util.concurrent.CountDownLatch@70e69696[Count = 0]

来自oracle关于CountDownLatch的文档:

一种同步辅助工具,允许一个或多个线程等待在其他线程中执行的一组操作完成。

CountDownLatch使用给定的计数进行初始化。由于countDown()方法的调用,await方法阻塞直到当前计数为零,在此之后所有等待的线程被释放,并且await的任何后续调用立即返回。这是一个一次性现象——计数不能重置。

CountDownLatch是一种通用的同步工具,可用于许多目的。

CountDownLatch被初始化为1的计数作为一个简单的开/关门闩或门:所有调用await的线程都在门处等待,直到被调用countDown()的线程打开。

CountDownLatch初始化为N可以用来让一个线程等待到N个线程完成某个操作,或者某个操作已经完成N次。

public void await()
throws InterruptedException

导致当前线程等待,直到锁存器计数到零,除非线程被中断。

如果当前计数为零,则此方法立即返回。

public void countDown()

减少锁存器的计数,如果计数为零,则释放所有等待线程。

如果当前计数大于零,则递减。如果新的计数为零,则所有等待的线程都将重新启用以进行线程调度。

解释你的例子。

  1. 你已经为latch变量设置count为3

    CountDownLatch latch = new CountDownLatch(3);
    
  2. You have passed this shared latch to Worker thread : Processor

  3. Three Runnable instances of Processor have been submitted to ExecutorService executor
  4. Main thread ( App ) is waiting for count to become zero with below statement

     latch.await();
    
  5. Processor thread sleeps for 3 seconds and then it decrements count value with latch.countDown()
  6. First Process instance will change latch count as 2 after it's completion due to latch.countDown().

  7. Second Process instance will change latch count as 1 after it's completion due to latch.countDown().

  8. Third Process instance will change latch count as 0 after it's completion due to latch.countDown().

  9. Zero count on latch causes main thread App to come out from await

  10. App program prints this output now : Completed

正如在JavaDoc (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html)中提到的,CountDownLatch是一个同步辅助工具,在Java 5中引入。这里的同步并不意味着限制对临界区的访问。而是对不同线程的动作进行排序。 通过CountDownLatch实现的同步类型类似于Join。 假设有一个线程“M”,它需要等待其他工作线程“T1”,“T2”,“T3”来完成它的任务 在Java 1.5之前,可以这样做,M运行以下代码

    T1.join();
T2.join();
T3.join();
上面的代码确保线程M在T1, T2, T3完成它的工作后恢复它的工作。T1, T2, T3可以按任意顺序完成它们的工作。 同样可以通过CountDownLatch实现,其中T1,T2, T3和线程M共享相同的CountDownLatch对象 “M”请求: countDownLatch.await();
其中“T1”,“T2”,“T3” countDownLatch.countdown();

join方法的一个缺点是M必须知道T1, T2, T3。如果后来添加了一个新的工作线程T4,那么M也必须知道它。使用CountDownLatch可以避免这种情况。 实现后的动作顺序为[T1,T2,T3](T1,T2,T3的顺序可以是任意)-> [M]

countDownLatch的最佳实时示例在此链接CountDownLatchExample中解释

这个来自Java文档的例子帮助我清楚地理解了这些概念:

class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);


for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();


doSomethingElse();            // don't let run yet
startSignal.countDown();      // let all threads proceed
doSomethingElse();
doneSignal.await();           // wait for all to finish
}
}


class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}


void doWork() { ... }
}

视觉解释:

enter image description here

显然,CountDownLatch允许一个线程(这里是Driver)等待一堆正在运行的线程(这里是Worker)完成它们的执行。

package practice;


import java.util.concurrent.CountDownLatch;


public class CountDownLatchExample {


public static void main(String[] args) throws InterruptedException {
CountDownLatch c= new CountDownLatch(3);  // need to decrements the count (3) to zero by calling countDown() method so that main thread will wake up after calling await() method
Task t = new Task(c);
Task t1 = new Task(c);
Task t2 = new Task(c);
t.start();
t1.start();
t2.start();
c.await(); // when count becomes zero main thread will wake up
System.out.println("This will print after count down latch count become zero");
}
}


class Task extends Thread{
CountDownLatch c;


public Task(CountDownLatch c) {
this.c = c;
}


@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(1000);
c.countDown();   // each thread decrement the count by one
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

最好的选项是CyclicBarrier,根据https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/CountDownLatch.html 看到:< / p >

CountDownLatch使用给定的计数进行初始化。由于调用了countDown()方法,await方法阻塞直到当前计数为零,在此之后所有等待的线程都被释放,并且await的任何后续调用立即返回。这是一个一次性现象——计数不能重置。如果您需要一个重置计数的版本,请考虑使用CyclicBarrier。