如何消耗BlockingCollection< T>分批 [英] How to consume a BlockingCollection<T> in batches

查看:88
本文介绍了如何消耗BlockingCollection< T>分批的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经想出了一些code消耗从队列中的所有瓦亭项目。而不是1加工项目1,是有意义的处理都在等着项目为一组。

I've come up with some code to consume all wating items from a queue. Rather than processing the items 1 by 1, it makes sense to process all waiting items as a set.

我宣布我的队列是这样的。

I've declared my queue like this.

private BlockingCollection<Item> items = 
    new BlockingCollection<Item>(new ConcurrentQueue<Item>);

然后,在一个消费者线程,我打算读的项目在这样的批次,

Then, on a consumer thread, I plan to read the items in batches like this,

Item nextItem;
while (this.items.TryTake(out nextItem, -1))
{
    var workToDo = new List<Item>();
    workToDo.Add(nextItem);

    while(this.items.TryTake(out nextItem))
    {
        workToDo.Add(nextItem);
    }

    // process workToDo, then go back to the queue.
}

此方法缺乏 GetConsumingEnumerable ,我不禁想,如果我错过了一个更好的办法,如果我的做法是错误的。

This approach lacks the utility of GetConsumingEnumerable and I can't help wondering if I've missed a better way, or if my approach is flawed.

有没有更好的方式来消耗 BlockingCollection&LT; T&GT; 分批

Is there a better way to consume a BlockingCollection<T> in batches?

推荐答案

虽然不如 ConcurrentQueue&LT; T&GT; 在某些方面,我自己的 LLQueue&LT; T&GT; 允许成批出列与<一个href="http://hackcraft.github.com/Ariadne/documentation/html/M_Ariadne_Collections_LLQueue_1_AtomicDequeueAll.htm"相对=nofollow> AtomicDequeueAll 的方法,其中,当前的所有项目的队列,从它发生在一个单一的(原子和线程安全的)的操作,并随后在非线程收集消费由单个线程。这种方法的目的是precisely为要批量读取操作的方案。

While not as good as ConcurrentQueue<T> in some ways, my own LLQueue<T> allows for a batched dequeue with a AtomicDequeueAll method where all items currently on the queue are taken from it in a single (atomic and thread-safe) operation, and are then in a non-threadsafe collection for consumption by a single thread. This method was designed precisely for the scenario where you want to batch the read operations.

这是不堵,虽然,虽然它可以被用来创建一个阻塞收集足够容易:

This isn't blocking though, though it could be used to create a blocking collection easily enough:

public BlockingBatchedQueue<T>
{
  private readonly AutoResetEvent _are = new AutoResetEvent(false);
  private readonly LLQueue<T> _store;
  public void Add(T item)
  {
    _store.Enqueue(item);
    _are.Set();
  }
  public IEnumerable<T> Take()
  {
    _are.WaitOne();
    return _store.AtomicDequeueAll();
  }
  public bool TryTake(out IEnumerable<T> items, int millisecTimeout)
  {
    if(_are.WaitOne(millisecTimeout))
    {
      items = _store.AtomicDequeueAll();
      return true;
    }
    items = null;
    return false;
  }
}

这是一个起点,没有做到以下几点:

That's a starting point that doesn't do the following:

  1. 在应对处置时挂起等待的读者。
  2. 在担心与多个阅读器既是一个潜在的竞争正在由一个写发生,而一个正在读触发(它只是认为偶尔空的结果枚举会好起来的)。
  3. 将任何上限写作。

所有这一切都可以增加太多,但我想保持到最低限度的一些实际应用,有希望不在上述规定的限制马车。

All of which could be added too, but I wanted to keep to the minimum of some practical use, that hopefully isn't buggy within the defined limitations above.

这篇关于如何消耗BlockingCollection&LT; T&GT;分批的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆