灵活的倒计时闩锁? [英] Flexible CountDownLatch?
问题描述
我现在遇到过两次问题,生产者线程生成 N 个工作项,将它们提交给 ExecutorService
,然后需要等到所有 N 个项目都处理完毕.
I have encountered a problem twice now whereby a producer thread produces N work items, submits them to an ExecutorService
and then needs to wait until all N items have been processed.
注意事项
- N 事先未知.如果是这样,我会简单地创建一个
CountDownLatch
然后让生产者线程await()
直到所有工作完成. - 使用
CompletionService
是不合适的,因为虽然我的生产者线程需要阻塞(即通过调用take()
),但没有信号表明所有工作都在完成,使生产者线程停止等待.
- N is not known in advance. If it were I would simply create a
CountDownLatch
and then have producer threadawait()
until all work was complete. - Using a
CompletionService
is inappropriate because although my producer thread needs to block (i.e. by callingtake()
) there's no way of signalling that all work is complete, to cause the producer thread to stop waiting.
我目前最喜欢的解决方案是使用整数计数器,并在提交工作项时递增,并在处理工作项时递减.在提交所有 N 个任务之后,我的生产者线程将需要等待一个锁,在收到通知时检查 counter == 0
是否.消费者线程将需要通知生产者,如果它已经减少了计数器并且新值为 0.
My current favoured solution is to use an integer counter, and to increment this whenever an item of work is submitted and to decrement it when a work item is processed. Following the subsmission of all N tasks my producer thread will need to wait on a lock, checking whether counter == 0
whenever it is notified. The consumer thread(s) will need to notify the producer if it has decremented the counter and the new value is 0.
是否有更好的方法来解决这个问题,或者在 java.util.concurrent
中是否有合适的构造我应该使用而不是滚动我自己的"?
Is there a better approach to this problem or is there a suitable construct in java.util.concurrent
I should be using rather than "rolling my own"?
提前致谢.
推荐答案
java.util.concurrent.Phaser
看起来很适合你.它计划在 Java 7 中发布,但最稳定的版本可以在 jsr166 的兴趣小组网站.
移相器是一个光荣的循环屏障.您可以注册 N 个派对,当您准备好在特定阶段等待他们的进展.
The phaser is a glorified Cyclic Barrier. You can register N number of parties and when youre ready await their advance at the specific phase.
一个关于它如何工作的简单例子:
A quick example on how it would work:
final Phaser phaser = new Phaser();
public Runnable getRunnable(){
return new Runnable(){
public void run(){
..do stuff...
phaser.arriveAndDeregister();
}
};
}
public void doWork(){
phaser.register();//register self
for(int i=0 ; i < N; i++){
phaser.register(); // register this task prior to execution
executor.submit( getRunnable());
}
phaser.arriveAndAwaitAdvance();
}
这篇关于灵活的倒计时闩锁?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!