为什么迭代GetConsumingEnumerable()不能完全空的根本阻挡集合 [英] Why does iterating over GetConsumingEnumerable() not fully empty the underlying blocking collection

查看:1515
本文介绍了为什么迭代GetConsumingEnumerable()不能完全空的根本阻挡集合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个量化的&安培;使用任务并行库重复的问题, BlockingCollection< T> ConcurrentQueue< T> &安培; GetConsumingEnumerable 试图创建一个简单的管道。

I have a quantifiable & repeatable problem using the Task Parallel Library, BlockingCollection<T>, ConcurrentQueue<T> & GetConsumingEnumerable while trying to create a simple pipeline.

在简单地说,将条目添加到默认 BlockingCollection< T> (其中引擎盖下是依靠一个 ConcurrentQueue< T> )从一个线程,并不能保证他们会被弹出的 BlockingCollection< T> 从另一个线程调用 GetConsumingEnumerable()方法

In a nutshell, adding entries to a default BlockingCollection<T> (which under the hood is relying on a ConcurrentQueue<T>) from one thread, does not guarantee that they will be popped off the BlockingCollection<T> from another thread calling the GetConsumingEnumerable() Method.

我创建了一个非常简单的WinForms应用程序复制/模拟这个刚刚打印整数到屏幕上。

I've created a very simple Winforms Application to reproduce/simulate this which just prints integers to the screen.


  • 定时器1 负责排队的工作项目......它使用一个叫做并发字典 _tracker 使它知道它已经加入到阻塞集合。

  • 定时器2 只是记录的计数状态,无论是 BlockingCollection &安培;的 _tracker

  • 启动按钮揭开序幕 Paralell.ForEach 这只是在阻挡集合迭代 GetConsumingEnumerable(),然后将其打印到第二个列表框。

  • STOP按钮停止定时器1 防止被添加到阻止收集更多的条目。

  • Timer1 is responsible for queueing up the work items... It uses a concurrent dictionary called _tracker so that it knows what it has already added to the blocking collection.
  • Timer2 is just logging the count state of both the BlockingCollection & of the _tracker
  • The START button kicks off a Paralell.ForEach which simply iterates over the blocking collections GetConsumingEnumerable() and starts printing them to the second list box.
  • The STOP button stops Timer1 preventing more entries from being added to the blocking collection.
public partial class Form1 : Form
{
    private int Counter = 0;
    private BlockingCollection<int> _entries;
    private ConcurrentDictionary<int, int> _tracker;
    private CancellationTokenSource _tokenSource;
    private TaskFactory _factory;

    public Form1()
    {
        _entries = new BlockingCollection<int>();
        _tracker = new ConcurrentDictionary<int, int>();
        _tokenSource = new CancellationTokenSource();
        _factory = new TaskFactory(); 
        InitializeComponent();
    }

    private void timer1_Tick(object sender, EventArgs e)
    { //ADDING TIMER -> LISTBOX 1
        for(var i = 0; i < 3; i++,Counter++)
        {
            if (_tracker.TryAdd(Counter, Counter))
            _entries.Add(Counter);
            listBox1.Items.Add(string.Format("Adding {0}", Counter));
        }
    }

    private void timer2_Tick_1(object sender, EventArgs e)
    { //LOGGING TIMER -> LIST BOX 3
        listBox3.Items.Add(string.Format("Tracker Count : {0} / Entries Count : {1}", _tracker.Count, _entries.Count));
    }

    private void button1_Click(object sender, EventArgs e)
    { //START BUTTON -> LOGS TO LIST BOX 2

        var options = new ParallelOptions {
                                CancellationToken = _tokenSource.Token,
                                MaxDegreeOfParallelism = 1
                            };

        _factory.StartNew(() => { Parallel.ForEach(_entries.GetConsumingEnumerable(), options, DoWork); });

        timer1.Enabled = timer2.Enabled = true;
        timer1.Start();
        timer2.Start();
    }

    private void DoWork(int entry)
    {
        Thread.Sleep(1000); //Sleep for 1 second to simulate work being done.
        Invoke((MethodInvoker)(() => listBox2.Items.Add(string.Format("Processed {0}", entry))));
        int oldEntry;
        _tracker.TryRemove(entry, out oldEntry);
    }

    private void button2_Click(object sender, EventArgs e)
    { //STOP BUTTON
        timer1.Stop();
        timer1.Enabled = false;
    }

下面的事件序列:


  • 按Start

  • 定时器蜱和放大器; ListBox1中立即用3消息(加入0,1,2)

  • 更新ListBox2是随后用3消息,1次开

      <里更新>加工0
    • 处理1

    • 处理2

    • Press Start
    • Timer1 ticks & ListBox1 is immediately updated with 3 messages (Adding 0, 1, 2)
    • ListBox2 is subsequent updated with 3 messages, 1 second apart
      • Processing 0
      • Processing 1
      • Processing 2

        <里更新>处理3
      • 处理4

      • 处理5 不打印......似乎已经失踪

      • Processing 3
      • Processing 4
      • Processing 5 is not printed... would appear to have gone "missing"

      缺少条目

      您可以看到,并发字典仍然追踪1项尚未处理和放大器;随后从 _tracker

      You can see that the concurrent dictionary is still tracking that 1 item has not yet been processed & subsequently removed from _tracker

      如果我再按开始,然后定时器1开始增加更多的3个条目和并行循环起死回生打印5,6,7安培; 8.

      If I Press Start again, then timer1 begins adding more 3 more entries and the Parallel loop comes back to life printing 5, 6, 7 & 8.

      我在一个完全丧失,为什么出现这种情况。调用再次开始明显调用newtask,它调用一个Paralell的foreach,并重新执行GetConsumingEnumerable(),它奇迹般地找到了丢失的入口......我

      I'm at a complete loss as to why this occurs. Calling start again obviously calls a newtask, which calls a Paralell foreach, and re-executes GetConsumingEnumerable() which magically finds the missing entry... I

      为什么是 BlockingCollection.GetConsumingEnumerable()不能保证对正在添加到集合中的每个项目迭代。

      Why is the BlockingCollection.GetConsumingEnumerable() not guaranteeing to iterate over every item that's added to the collection.

      为什么增加更多的条目随后导致其得到脱胶,并继续它的处理?

      推荐答案

      您不能使用 GetConsumingEnumerable() Parallel.ForEach()

      使用 GetConsumingPartitioner 从的TPL额外

      在博客中你也将得到一个解释,为什么不能使用 GetConsumingEnumerable()

      In the blog post you will also get an explanation why can't use GetConsumingEnumerable()

      由双方在默认情况下采用的分区算法Parallel.ForEach和PLINQ使用分块,以最大限度地降低成本同步。而不是每个元素以锁一次,它会采取锁,抢一组元素(大块),然后释放锁

      The partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

      即Parallel.ForEach等待直到收到继续之前的一组工作项。究竟是什么实验显示。

      i.e. Parallel.ForEach wait until it receives a group of work items before continuing. Exactly what your experiment shows.

      这篇关于为什么迭代GetConsumingEnumerable()不能完全空的根本阻挡集合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆