灵活CountDownLatch? [英] Flexible CountDownLatch?
问题描述
我遇到了一个问题,现在两次,一个生产者线程产生N个工作项,提交到 ExecutorService
,然后需要等待,直到所有N个项目都已处理。
I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService
and then needs to wait until all N items have been processed.
注意事项
- 不提前知道。如果是,我只需创建一个
CountDownLatch
,然后在所有工作完成之前,使用生成器线程await()
/ li>
- 使用
CompletionService
是不合适的,因为虽然我的生产者线程需要阻塞(即通过调用/ code>)无法通知所有工作完成,以使生产者线程停止等待。
- N is not known in advance. If it were I would simply create a
CountDownLatch
and then have producer threadawait()
until all work was complete. - Using a
CompletionService
is inappropriate because although my producer thread needs to block (i.e. by callingtake()
) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.
我当前最喜欢的解决方案是使用整数计数器,并且在提交工作项时增量,当工作提交时减少项目被处理。在所有N个任务的提交后,我的生产者线程需要等待一个锁,检查 counter == 0
每当它被通知。如果消费者线程已经递减计数器并且新值为0,则消费者线程将需要通知生产者。
My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0
whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.
是否有更好的方法来解决这个问题,或者在 java.util.concurrent
中应该使用合适的结构,而不是滚动我自己的?
Is there a better approach to this problem or is there a suitable construct in java.util.concurrent
I should be using rather than "rolling my own"?
先感谢。
推荐答案
java.util.concurrent.Phaser
看起来像效果很好您。它计划在Java 7中发布,但最稳定的版本可以在 jsr166 的兴趣小组网站。
phaser是一个荣誉的循环障碍。
The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.
有关如何工作的一个简单示例:
A quick example on how it would work:
final Phaser phaser = new Phaser();
public Runnable getRunnable(){
return new Runnable(){
public void run(){
..do stuff...
phaser.arriveAndDeregister();
}
};
}
public void doWork(){
phaser.register();//register self
for(int i=0 ; i < N; i++){
phaser.register(); // register this task prior to execution
executor.submit( getRunnable());
}
phaser.arriveAndAwaitAdvance();
}
这篇关于灵活CountDownLatch?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!