为什么迭代GetConsumingEnumerable()不能完全空的根本阻挡集合 [英] Why does iterating over GetConsumingEnumerable() not fully empty the underlying blocking collection
问题描述
我有一个量化的&安培;使用任务并行库重复的问题, BlockingCollection< T>
, ConcurrentQueue< T>
&安培; GetConsumingEnumerable
试图创建一个简单的管道。
I have a quantifiable & repeatable problem using the Task Parallel Library, BlockingCollection<T>
, ConcurrentQueue<T>
& GetConsumingEnumerable
while trying to create a simple pipeline.
在简单地说,将条目添加到默认 BlockingCollection< T>
(其中引擎盖下是依靠一个 ConcurrentQueue< T>
)从一个线程,并不能保证他们会被弹出的 BlockingCollection< T>
从另一个线程调用 GetConsumingEnumerable()
方法
In a nutshell, adding entries to a default BlockingCollection<T>
(which under the hood is relying on a ConcurrentQueue<T>
) from one thread, does not guarantee that they will be popped off the BlockingCollection<T>
from another thread calling the GetConsumingEnumerable()
Method.
我创建了一个非常简单的WinForms应用程序复制/模拟这个刚刚打印整数到屏幕上。
I've created a very simple Winforms Application to reproduce/simulate this which just prints integers to the screen.
-
定时器1
负责排队的工作项目......它使用一个叫做并发字典_tracker
使它知道它已经加入到阻塞集合。 -
定时器2
只是记录的计数状态,无论是BlockingCollection
&安培;的_tracker
- 启动按钮揭开序幕
Paralell.ForEach
这只是在阻挡集合迭代GetConsumingEnumerable()
,然后将其打印到第二个列表框。 - STOP按钮停止
定时器1
防止被添加到阻止收集更多的条目。
Timer1
is responsible for queueing up the work items... It uses a concurrent dictionary called_tracker
so that it knows what it has already added to the blocking collection.Timer2
is just logging the count state of both theBlockingCollection
& of the_tracker
- The START button kicks off a
Paralell.ForEach
which simply iterates over the blocking collectionsGetConsumingEnumerable()
and starts printing them to the second list box. - The STOP button stops
Timer1
preventing more entries from being added to the blocking collection.
public partial class Form1 : Form
{
private int Counter = 0;
private BlockingCollection<int> _entries;
private ConcurrentDictionary<int, int> _tracker;
private CancellationTokenSource _tokenSource;
private TaskFactory _factory;
public Form1()
{
_entries = new BlockingCollection<int>();
_tracker = new ConcurrentDictionary<int, int>();
_tokenSource = new CancellationTokenSource();
_factory = new TaskFactory();
InitializeComponent();
}
private void timer1_Tick(object sender, EventArgs e)
{ //ADDING TIMER -> LISTBOX 1
for(var i = 0; i < 3; i++,Counter++)
{
if (_tracker.TryAdd(Counter, Counter))
_entries.Add(Counter);
listBox1.Items.Add(string.Format("Adding {0}", Counter));
}
}
private void timer2_Tick_1(object sender, EventArgs e)
{ //LOGGING TIMER -> LIST BOX 3
listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
}
private void button1_Click(object sender, EventArgs e)
{ //START BUTTON -> LOGS TO LIST BOX 2
var options = new ParallelOptions {
CancellationToken = _tokenSource.Token,
MaxDegreeOfParallelism = 1
};
_factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });
timer1.Enabled = timer2.Enabled = true;
timer1.Start();
timer2.Start();
}
private void DoWork(int entry)
{
Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
int oldEntry;
_tracker.TryRemove(entry, out oldEntry);
}
private void button2_Click(object sender, EventArgs e)
{ //STOP BUTTON
timer1.Stop();
timer1.Enabled = false;
}
下面的事件序列:
- 按Start
- 定时器蜱和放大器; ListBox1中立即用3消息(加入0,1,2)
- 更新ListBox2是随后用3消息,1次开
<里更新>加工0 - 处理1
- 处理2
- Press Start
- Timer1 ticks & ListBox1 is immediately updated with 3 messages (Adding 0, 1, 2)
- ListBox2 is subsequent updated with 3 messages, 1 second apart
- Processing 0
- Processing 1
- Processing 2
<里更新>处理3 - 处理4
- 处理5 不打印......似乎已经失踪
- Processing 3
- Processing 4
- Processing 5 is not printed... would appear to have gone "missing"
您可以看到,并发字典仍然追踪1项尚未处理和放大器;随后从 _tracker
You can see that the concurrent dictionary is still tracking that 1 item has not yet been processed & subsequently removed from _tracker
如果我再按开始,然后定时器1开始增加更多的3个条目和并行循环起死回生打印5,6,7安培; 8.
If I Press Start again, then timer1 begins adding more 3 more entries and the Parallel loop comes back to life printing 5, 6, 7 & 8.
我在一个完全丧失,为什么出现这种情况。调用再次开始明显调用newtask,它调用一个Paralell的foreach,并重新执行GetConsumingEnumerable(),它奇迹般地找到了丢失的入口......我
I'm at a complete loss as to why this occurs. Calling start again obviously calls a newtask, which calls a Paralell foreach, and re-executes GetConsumingEnumerable() which magically finds the missing entry... I
为什么是 BlockingCollection.GetConsumingEnumerable()
不能保证对正在添加到集合中的每个项目迭代。
Why is the BlockingCollection.GetConsumingEnumerable()
not guaranteeing to iterate over every item that's added to the collection.
为什么增加更多的条目随后导致其得到脱胶,并继续它的处理?
推荐答案
您不能使用 GetConsumingEnumerable()
在 Parallel.ForEach()
。
使用 GetConsumingPartitioner
从的TPL额外
在博客中你也将得到一个解释,为什么不能使用 GetConsumingEnumerable()
In the blog post you will also get an explanation why can't use GetConsumingEnumerable()
由双方在默认情况下采用的分区算法Parallel.ForEach和PLINQ使用分块,以最大限度地降低成本同步。而不是每个元素以锁一次,它会采取锁,抢一组元素(大块),然后释放锁
The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.
即Parallel.ForEach等待直到收到继续之前的一组工作项。究竟是什么实验显示。
i.e. Parallel.ForEach wait until it receives a group of work items before continuing. Exactly what your experiment shows.
这篇关于为什么迭代GetConsumingEnumerable()不能完全空的根本阻挡集合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!