Java 并发: 倒计时锁存与循环屏障

我在读 并发 API的时候发现

  • CountDownLatch: 一种同步帮助,它允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。
  • CyclicBarrier: 一个同步辅助工具,允许一组线程彼此等待到达一个共同的障碍点。

在我看来,两者似乎是平等的,但我确信,其中还有更多内容。

例如,在 CoundownLatch, the countdown value could not be reset, that can happen in the case of CyclicBarrier中。

这两者之间还有什么区别吗?
什么是 use cases有人会想要重置倒计时的价值?

98516 次浏览

一个主要的区别是,循环屏障接受一个(可选的) Runnable 任务,该任务在满足公共屏障条件后运行。

它还允许您获得在屏障处等待的客户端数量以及触发屏障所需的数量。一旦触发屏障被重置,可以再次使用。

对于简单的用例-服务启动等等。.倒计时锁就可以了。循环障碍对于更复杂的协调任务是有用的。这种情况的一个例子是并行计算(其中涉及多个子任务) ,类似于 MapReduce

主要的区别在 CountdownLatch 的 Javadocs 文档中有明确的说明,即:

对象初始化 CountDownLatch 等待方法块 直到当前计数为零 由于调用 CountDown () 方法,之后所有的等待 线程被释放和任何 随后等待归还的请求 马上,这是一次性的 现象——计数不可能是 重置。如果你需要一个版本 重置计数,请考虑使用 循环屏障。

来源 1.6 Javadoc

还有一个区别。

使用 CyclicBarrier时,假设您指定触发屏障的等待线程的数量。如果指定5,则必须至少有5个线程来调用 await()

使用 CountDownLatch时,指定对 countDown()的调用次数,这将导致释放所有正在等待的线程。这意味着只能使用一个线程的 CountDownLatch

“你为什么要这么做?”你可能会说。假设您正在使用一个由执行回调的其他人编码的神秘 API。您希望某个线程等待某个回调被调用多次。您不知道将对哪些线程调用回调。在这种情况下,CountDownLatch是完美的,而我想不出任何使用 CyclicBarrier来实现它的方法(实际上,我可以,但是它涉及到超时... ... 呸!).

我只是希望 CountDownLatch可以重置!

对于 CyclicBlock,一旦所有子线程开始调用 Barrier.wait () ,就会在 Barrier.wait 中执行 Runnable。在每个子线程中等待的 barrier.wait 将花费不同的时间来完成,并且它们都在同一时间完成。

CountDownLatch 用于一次性同步。在使用 CountDownLatch 时,任何线程都可以随意多次调用 count Down ()。调用 wait ()的线程被阻塞,直到计数为零,因为其他未阻塞的线程调用 CountDown ()。用于 CountDownLatch 的 javadoc指出:

等待方法会阻塞,直到当前计数达到零为止 调用 count Down ()方法,然后所有等待的线程 释放和任何随后的等待返回的调用 马上。 ...

另一个典型的用法是把一个问题分成 N 个部分, 用一个 Runnable 来描述每个部分,Runnable 执行该部分并且 对锁存器进行倒计时,并将所有 Runnables 排队到一个 Execator。 当所有子部件完成后,协调线程将能够 穿过等待。(当线程必须在 这样,你就可以使用 CyclicBarrir。)

相反,循环屏障用于多个同步点,例如,如果一组线程正在运行循环/阶段计算,并且需要在开始下一个迭代/阶段之前进行同步。根据 循环障碍的 javadoc:

该屏障称为循环屏障,因为它可以在 等待线程被释放。

与 CountDownLatch 不同,等待()的每个调用都属于某个阶段,可能导致线程阻塞,直到属于该阶段的所有方都调用 wait ()。CyclicBlock 不支持显式 count Down ()操作。

还没有人提到的一点是,在 CyclicBarrier中,如果一个线程出现问题(超时、中断...) ,所有到达 await()的其他线程都会出现异常。参见 Javadoc:

CyclicBarrierException 对于失败的同步尝试使用了一个 all-or-none 中断模型: 如果一个线程由于中断、失败或超时而过早地离开了一个屏障点,那么所有在该屏障点等待的其他线程也会通过 BrokenBarrierException 异常离开(或者如果它们也在大约同一时间被中断,则通过 InterruptedException 异常离开)。

这个问题已经得到了充分的回答,但是我认为我可以通过发布一些代码来增加一些价值。

为了说明循环势垒的行为,我编写了一些示例代码。一旦屏障被倾斜,它是 自然而然地复位,以便它可以再次使用(因此它是“循环”)。当您运行程序时,请注意“ Let’s play”输出只有在屏障被打开后才会被触发。

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;


public class CyclicBarrierCycles {


static CyclicBarrier barrier;


public static void main(String[] args) throws InterruptedException {
barrier = new CyclicBarrier(3);


new Worker().start();
Thread.sleep(1000);
new Worker().start();
Thread.sleep(1000);
new Worker().start();
Thread.sleep(1000);


System.out.println("Barrier automatically resets.");


new Worker().start();
Thread.sleep(1000);
new Worker().start();
Thread.sleep(1000);
new Worker().start();
}


}




class Worker extends Thread {
@Override
public void run() {
try {
CyclicBarrierCycles.barrier.await();
System.out.println("Let's play.");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

我认为 JavaDoc 已经明确地解释了这些差异。 大多数人都知道 CountDownLatch 不能重置,但 CyclicBlock 可以。但这不是唯一的区别,否则 Cyclic堰塞湖可以重命名为 ResetbleCountDownLatch。 我们应该从 JavaDoc 中描述的目标的角度来区分这些差异

CountDownLatch: 同步帮助,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。

一个同步辅助工具,允许一组线程等待彼此到达一个共同的屏障点。

在 count DownLatch 中,有一个或多个线程正在等待一组 其他线索完成。在这种情况下,有两种类型的线程,一种类型是等待,另一种类型是执行某些操作,在完成任务后,它们可能正在等待或者刚刚终止。

在 CyclicBlock 中,只有一种类型的线程,它们相互等待,它们是相等的。

一个明显的区别是,只有 N 个线程可以在一个循环中等待一个循环屏障 N 被释放。但是在 N 的 CountDownLatch 上可以等待无限数量的线程。倒数递减可以由一个线程完成 N 次或 N 线程一次,每次或组合。

CountDownLatch中,线程等待其他线程完成它们的执行。在 循环屏障中,工作线程相互等待对方完成它们的执行。

一旦计数达到零并且闩锁打开,你就不能重用同一个 CountDownLatch实例,另一方面 循环屏障可以通过重置屏障来重用,一旦屏障被打破。

简而言之,就是要了解两者之间关键的 功能性的差异:

public class CountDownLatch {
private Object mutex = new Object();
private int count;


public CountDownLatch(int count) {
this.count = count;
}


public void await() throws InterruptedException {
synchronized (mutex) {
while (count > 0) {
mutex.wait();
}
}
}


public void countDown() {
synchronized (mutex) {
if (--count == 0)
mutex.notifyAll();
}


}
}

还有

public class CyclicBarrier {
private Object mutex = new Object();
private int count;


public CyclicBarrier(int count) {
this.count = count;
}


public void await() throws InterruptedException {
synchronized (mutex) {
count--;
while(count > 0)
mutex.wait();
mutex.notifyAll();
}
}
}

当然,除了诸如非阻塞、定时等待、诊断以及在上述答案中已经详细解释过的所有特性之外。

然而,上面的类是完全功能性的,并且在提供的功能范围内等价于它们的对应名称。

另一方面,CountDownLatch的内部类子类 AQS,而 CyclicBarrier使用 ReentrantLock(我怀疑它可能是另一种方式,或者两者都可以使用 AQS,或者两者都使用 Lock ——没有任何性能效率的损失)

当我研究闩锁和自行车障碍时,我想到了这个比喻。 自行车障碍 : 想象一下,一家公司有一个会议室。为了开会,必须有一定数量的与会者来参加会议(使会议正式开始)。下面是正常与会者(员工)的代码

class MeetingAtendee implements Runnable {


CyclicBarrier myMeetingQuorumBarrier;


public MeetingAtendee(CyclicBarrier myMileStoneBarrier) {
this.myMeetingQuorumBarrier = myMileStoneBarrier;
}


@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + " i joined the meeting ...");
myMeetingQuorumBarrier.await();
System.out.println(Thread.currentThread().getName()+" finally meeting stared ...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
System.out.println("Meeting canceled! every body dance <by chic band!>");
}
}
}

员工参加会议,等别人来开会。如果会议被取消,他也会退出:)然后我们有老板,他不喜欢等待别人的出现,如果他失去了他的病人,他取消会议。

class MeetingAtendeeTheBoss implements Runnable {


CyclicBarrier myMeetingQuorumBarrier;


public MeetingAtendeeTheBoss(CyclicBarrier myMileStoneBarrier) {
this.myMeetingQuorumBarrier = myMileStoneBarrier;
}


@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "I am THE BOSS - i joined the meeting ...");
//boss dose not like to wait too much!! he/she waits for 2 seconds and we END the meeting
myMeetingQuorumBarrier.await(1,TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+" finally meeting stared ...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
System.out.println("what WHO canceled The meeting");
} catch (TimeoutException e) {
System.out.println("These employees waste my time!!");
}
}
}

在平常的一天,员工来开会等待其他人的出现,如果一些与会者没有来,他们必须无限期地等待!在一些特别会议上,老板来了,他不喜欢等待。(5个人需要开会,但只有老板和一个热情的员工来了) ,所以他取消了会议(生气)

CyclicBarrier meetingAtendeeQuorum = new CyclicBarrier(5);
Thread atendeeThread = new Thread(new MeetingAtendee(meetingAtendeeQuorum));
Thread atendeeThreadBoss = new Thread(new MeetingAtendeeTheBoss(meetingAtendeeQuorum));
atendeeThread.start();
atendeeThreadBoss.start();

产出:

//Thread-1I am THE BOSS - i joined the meeting ...
// Thread-0 i joined the meeting ...
// These employees waste my time!!
// Meeting canceled! every body dance <by chic band!>

还有一种情况是,另一个外部线程(地震)取消了会议(调用重置方法)。在这种情况下,所有等待的线程都会被异常唤醒。

class NaturalDisasters implements Runnable {


CyclicBarrier someStupidMeetingAtendeeQuorum;


public NaturalDisasters(CyclicBarrier someStupidMeetingAtendeeQuorum) {
this.someStupidMeetingAtendeeQuorum = someStupidMeetingAtendeeQuorum;
}


void earthQuakeHappening(){
System.out.println("earth quaking.....");
someStupidMeetingAtendeeQuorum.reset();
}


@Override
public void run() {
earthQuakeHappening();
}
}

运行代码会导致有趣的输出:

// Thread-1I am THE BOSS - i joined the meeting ...
// Thread-0 i joined the meeting ...
// earth quaking.....
// what WHO canceled The meeting
// Meeting canceled! every body dance <by chic band!>

你也可以在会议室增加一个秘书,如果一个会议正在举行,她会记录下所有的事情,但是她不是会议的一部分:

class MeetingSecretary implements Runnable {


@Override
public void run() {
System.out.println("preparing meeting documents");
System.out.println("taking notes ...");
}
}

锁具 : 如果愤怒的老板想为公司客户举办展览会,一切都需要准备好(资源)。我们提供了一个待办事项清单每个工人(线程)剂量他的工作,我们检查待办事项清单(一些工人做油漆,其他人准备音响系统...)。当待办事项列表中的所有项目完成后(资源已提供) ,我们可以向客户敞开大门。

public class Visitor implements Runnable{


CountDownLatch exhibitonDoorlatch = null;


public Visitor (CountDownLatch latch) {
exhibitonDoorlatch  = latch;
}


public void run() {
try {
exhibitonDoorlatch .await();
} catch (InterruptedException e) {
e.printStackTrace();
}


System.out.println("customer visiting exebition");
}
}

工人们是如何准备展览的:

class Worker implements Runnable {


CountDownLatch myTodoItem = null;


public Worker(CountDownLatch latch) {
this.myTodoItem = latch;
}


public void run() {
System.out.println("doing my part of job ...");
System.out.println("My work is done! remove it from todo list");
myTodoItem.countDown();
}
}


CountDownLatch preperationTodoList = new CountDownLatch(3);


// exhibition preparation workers
Worker      electricalWorker      = new Worker(preperationTodoList);
Worker      paintingWorker      = new Worker(preperationTodoList);


// Exhibition Visitors
ExhibitionVisitor exhibitionVisitorA = new ExhibitionVisitor(preperationTodoList);
ExhibitionVisitor exhibitionVisitorB = new ExhibitionVisitor(preperationTodoList);
ExhibitionVisitor exhibitionVisitorC = new ExhibitionVisitor(preperationTodoList);


new Thread(electricalWorker).start();
new Thread(paintingWorker).start();


new Thread(exhibitionVisitorA).start();
new Thread(exhibitionVisitorB).start();
new Thread(exhibitionVisitorC).start();

CountDownLatch 是任何东西的倒计时; CyclicBlock 是仅针对线程的倒计时

假设有5个工人线程和一个托运人线程,当工人生产100个项目,托运人将发货出去。

对于 CountDownLatch,计数器可以位于 worker 或 item 上

周期障碍,计数器只能对工人

如果一个工作人员陷入无限的睡眠,使用 CountDownLatch 对项目,Shipper 可以发货; 但是,使用 CyclicBlock,Shipper 永远不能被调用

@ Kevin Lee 和@Jon 我尝试了可选的 Cyclic堰塞。看起来像是在开始和循环障碍被推倒之后运行的。这是代码和输出

静态循环屏障;

    public static void main(String[] args) throws InterruptedException {
barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("I run in the beginning and after the CyclicBarrier is tipped");
}
});


new Worker().start();
Thread.sleep(1000);
new Worker().start();
Thread.sleep(1000);
new Worker().start();
Thread.sleep(1000);


System.out.println("Barrier automatically resets.");


new Worker().start();
Thread.sleep(1000);
new Worker().start();
Thread.sleep(1000);
new Worker().start();
}

输出

I run in the beginning and after the CyclicBarrier is tipped
Let's play.
Let's play.
Let's play.
Barrier automatically resets.
I run in the beginning and after the CyclicBarrier is tipped
Let's play.
Let's play.
Let's play.