以原子方式从 ConcurrentQueue 中获取所有内容 [英] Atomically taking everything from a ConcurrentQueue

查看:28
本文介绍了以原子方式从 ConcurrentQueue 中获取所有内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有多个线程生成项目并将它们粘贴在一个公共 ConcurrentQueue 中:

I have multiple threads generating items and sticking them in a common ConcurrentQueue:

private ConcurrentQueue<GeneratedItem> queuedItems = new ConcurrentQueue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    queuedItems.Enqueue(new GeneratedItem(...));
    // ...
}

我有另一个单一的消费者线程,但它需要在这个应用程序的上下文中工作的方式是,偶尔,它只需要获取当前线程队列中的所有内容,然后将其从中删除排队,一气呵成.比如:

I have another single consumer thread but the way it needs to work in the context of this application is, occasionally, it just needs to grab everything currently in the threads' queue, removing it from that queue, all in one shot. Something like:

private Queue<GeneratedItem> GetAllNewItems () {

    return queuedItems.TakeEverything(); // <-- not a real method

}

我想我查看了所有文档(关于集合及其实现的接口),但我似乎没有找到类似同时从队列中获取所有对象",甚至同时与另一个队列交换内容"之类的东西.

I think I looked through all the documentation (for the collection and its implemented interfaces) but I didn't seem to find anything like a "concurrently take all objects from queue", or even "concurrently swap contents with another queue".

如果我放弃 ConcurrentQueue 并使用 lock 保护普通的 Queue,我可以做到这一点,如下所示:

I could do this no problem if I ditch the ConcurrentQueue and just protect a normal Queue with a lock, like this:

private Queue<GeneratedItem> queuedItems = new Queue<GeneratedItem>();

private void BunchOfThreads () {
    // ...
    lock (queuedItems) {
        queuedItems.Enqueue(new GeneratedItem(...));
    }
    // ...
}

private Queue<GeneratedItem> GetAllNewItems () {

    lock (queuedItems) {
        Queue<GeneratedItem> newItems = new Queue<Event>(queuedItems);
        queuedItems.Clear();
        return newItems;
    }

}

但是,我喜欢 ConcurrentQueue 的便利性,而且因为我刚刚学习 C#,所以我对 API 很好奇;所以我的问题是,有没有办法用其中一个并发集合来做到这一点?

But, I like the convenience of the ConcurrentQueue and also since I'm just learning C# I'm curious about the API; so my question is, is there a way to do this with one of the concurrent collections?

是否有某种方法可以访问 ConcurrentQueue 使用的任何同步对象,并为我自己的目的将其锁定,以便一切都很好地协同工作?然后我可以锁定它,拿走所有东西,然后释放?

Is there perhaps some way to access whatever synchronization object ConcurrentQueue uses and lock it for myself for my own purposes so that everything plays nicely together? Then I can lock it, take everything, and release?

推荐答案

这取决于你想做什么.根据源代码

It depends what you want to do. As per the comments in the source code

//number of snapshot takers, GetEnumerator(), ToList() and ToArray() operations take snapshot.

这是通过内部调用 ToList() 来工作的m_numSnapshotTakers 和一个 spin 机制

This works by internally calling ToList() which in turn works on m_numSnapshotTakers and a spin mechanism

/// Copies the <see cref="ConcurrentQueue{T}"/> elements to a new <see
/// cref="T:System.Collections.Generic.List{T}"/>.
/// </summary>
/// <returns>A new <see cref="T:System.Collections.Generic.List{T}"/> containing a snapshot of
/// elements copied from the <see cref="ConcurrentQueue{T}"/>.</returns>
private List<T> ToList()
{
   // Increments the number of active snapshot takers. This increment must happen before the snapshot is 
   // taken. At the same time, Decrement must happen after list copying is over. Only in this way, can it
   // eliminate race condition when Segment.TryRemove() checks whether m_numSnapshotTakers == 0. 
   Interlocked.Increment(ref m_numSnapshotTakers);

   List<T> list = new List<T>();
   try
   {
       //store head and tail positions in buffer, 
       Segment head, tail;
       int headLow, tailHigh;
       GetHeadTailPositions(out head, out tail, out headLow, out tailHigh);

       if (head == tail)
       {
           head.AddToList(list, headLow, tailHigh);
       }
       else
       {
           head.AddToList(list, headLow, SEGMENT_SIZE - 1);
           Segment curr = head.Next;
           while (curr != tail)
           {
               curr.AddToList(list, 0, SEGMENT_SIZE - 1);
               curr = curr.Next;
           }
           //Add tail segment
           tail.AddToList(list, 0, tailHigh);
       }
   }
   finally
   {
       // This Decrement must happen after copying is over. 
       Interlocked.Decrement(ref m_numSnapshotTakers);
   }
   return list;
}

如果您只需要快照,那么您很幸运.但是,似乎没有内置方法可以以线程安全的方式从 ConcurrentQueue 中获取和删除所有项目.您将需要使用 lock 或类似方法来烘焙自己的同步.或者自己动手(从源头上看可能并不难).

If a snapshot is all you want, then you are in luck. However, there is seemingly no built in way to get and remove all the items from a ConcurrentQueue in a thread safe manner. You will need to bake your own synchronisation by using lock or similar. Or roll your own (which might not be all that difficult looking at the source).

这篇关于以原子方式从 ConcurrentQueue 中获取所有内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆