阻塞队列和多线程使用者,如何知道何时停止

我有一个单线程生产者,它创建一些任务对象,然后添加到一个 ArrayBlockingQueue(这是固定大小)。

我还启动了一个多线程使用者。这是作为固定线程池(Executors.newFixedThreadPool(threadCount);)构建的。然后,我将一些 ConsumerWorker 实例提交给这个线程池,每个 ConsumerWorker 都有一个对上述 ArrayBlockingQueue 实例的引用。

每个这样的 Worker 将在队列上执行 take()并处理任务。

我的问题是,什么是最好的方式让一个工人知道什么时候不会有更多的工作要做。换句话说,我如何告诉 Workers 生产者已经完成了对队列的添加,从这一点开始,每个 worker 应该在看到 Queue 为空时停止。

我现在得到的是一个设置,在这个设置中,我的 Producer 被初始化为一个回调函数,当他完成它的工作(向队列添加东西)时,这个回调函数被触发。我还保留了我创建并提交给 ThreadPool 的所有 ConsumerWorkers 的列表。当 Producer Callback 告诉我制作人已经完成时,我可以将此告诉每个工作人员。此时,他们应该简单地继续检查队列是否为空,当队列变为空时,他们应该停止,从而允许我优雅地关闭 ExecutorService 线程池。是这样的

public class ConsumerWorker implements Runnable{


private BlockingQueue<Produced> inputQueue;
private volatile boolean isRunning = true;


public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}


@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning || !inputQueue.isEmpty()) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}


//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void setRunning(boolean isRunning) {
this.isRunning = isRunning;
}

}

这里的问题是,我有一个明显的竞争条件,有时生产者将结束,发出信号,并且 ConsumerWorkers 将在消耗队列中的所有内容之前停止。

我的问题是什么是最好的方式来同步这一点,以便它所有的工作正常?我是否应该同步整个部分,它检查生产者是否正在运行加上队列是否为空加上从队列中的一个块(在队列对象上)获取某些内容?我是否应该同步 ConsumerWorker 实例上 isRunning布尔值的更新?还有别的建议吗?

更新,下面是我最终使用的工作实现:

public class ConsumerWorker implements Runnable{


private BlockingQueue<Produced> inputQueue;


private final static Produced POISON = new Produced(-1);


public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}


@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(true) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Produced queueElement = inputQueue.take();
Thread.sleep(new Random().nextInt(100));
if(queueElement==POISON) {
break;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Consumer "+Thread.currentThread().getName()+" END");
}
}


//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void stopRunning() {
try {
inputQueue.put(POISON);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

这主要是受 JohnVint 下面的回答的启发,只有一些小的修改。

= = = 由于@vendhan 的评论而更新。

谢谢你的观察。您是对的,这个问题中的第一个代码片段(在其他问题中)包含了一个 while(isRunning || !inputQueue.isEmpty())实际上没有意义的代码片段。

在我实际的最终实现中,我做了一些更接近于你建议的用“ & &”(和)代替“ | |”(或)的事情,在这个意义上,每个工作者(消费者)现在只检查他从列表中获得的元素是否是毒丸,如果是,停止(所以理论上我们可以说,工作者必须运行,队列必须不是空的)。

61102 次浏览

您应该从队列继续到 take()。你可以用一粒毒药来告诉工人停下来。例如:

private final Object POISON_PILL = new Object();


@Override
public void run() {
//worker loop keeps taking en element from the queue as long as the producer is still running or as
//long as the queue is not empty:
while(isRunning) {
System.out.println("Consumer "+Thread.currentThread().getName()+" START");
try {
Object queueElement = inputQueue.take();
if(queueElement == POISON_PILL) {
inputQueue.add(POISON_PILL);//notify other threads to stop
return;
}
//process queueElement
} catch (Exception e) {
e.printStackTrace();
}
}
}


//this is used to signal from the main thread that he producer has finished adding stuff to the queue
public void finish() {
//you can also clear here if you wanted
isRunning = false;
inputQueue.add(POISON_PILL);
}

您可以使用许多策略,但是其中一个简单的策略是拥有一个任务子类,该子类标志着任务的结束。制片人不会直接发送这个信号。相反,它对此任务子类的实例进行排队。当您的一个使用者完成这个任务并执行它时,将导致发送信号。

我会给工人们发送一个特殊的工作包,告诉他们应该停工:

public class ConsumerWorker implements Runnable{


private static final Produced DONE = new Produced();


private BlockingQueue<Produced> inputQueue;


public ConsumerWorker(BlockingQueue<Produced> inputQueue) {
this.inputQueue = inputQueue;
}


@Override
public void run() {
for (;;) {
try {
Produced item = inputQueue.take();
if (item == DONE) {
inputQueue.add(item); // keep in the queue so all workers stop
break;
}
// process `item`
} catch (Exception e) {
e.printStackTrace();
}
}
}

}

要停止工作线程,只需将 ConsumerWorker.DONE添加到队列中。

在尝试从队列中检索元素的代码块中,使用 poll(time,unit)而不是 take()

try {
Object queueElement = inputQueue.poll(timeout,unit);
//process queueElement
} catch (InterruptedException e) {
if(!isRunning && queue.isEmpty())
return ;
}

通过指定适当的超时值,可以确保线程不会继续阻塞,以防出现

  1. isRunning是真的
  2. 队列变为空,因此线程进入阻塞等待(如果使用 take())
  3. isRunning设置为 false

我必须使用一个多线程生产者和一个多线程使用者。 我最终得到了一个 Scheduler -- N Producers -- M Consumers方案,每两个通信通过一个队列(总共两个队列)。调度器用生成数据的请求填充第一个队列,然后用 N 个“毒丸”填充它。有一个活动生产者计数器(原子整数) ,最后一个接收最后一个毒丸的生产者将 M 毒丸发送到消费者队列。

我们不能使用一个 CountDownLatch做到这一点,其中的大小是在生产者的记录数量。并且每个消费者将 countDown经过处理后记录。当所有任务完成时,它与 awaits()方法交叉。那就阻止所有的消费者。处理所有记录。