当使用ConcurrentQueue时,尝试在并行循环时出队 [英] While using ConcurrentQueue, trying to dequeue while looping through in parallel

查看:2990
本文介绍了当使用ConcurrentQueue时,尝试在并行循环时出队的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的.NET 4应用程序中使用并行数据结构,我有一个 ConcurrentQueue ,在我通过它处理时被添加。

I am using the parallel data structures in my .NET 4 application and I have a ConcurrentQueue that gets added to while I am processing through it.

我想做一些类似的事情:

I want to do something like:

personqueue.AsParallel()。WithDegreeOfParallelism(20).ForAll (i => ...);

,因为我进行数据库调用来保存数据,线程。

as I make database calls to save the data, so I am limiting the number of concurrent threads.

但是,我期望 ForAll 不会出队,做

But, I expect that the ForAll isn't going to dequeue, and I am concerned about just doing

ForAll(i => {
    personqueue.personqueue.TryDequeue(...);
    ...
});

,因为不能保证我会弹出正确的。

as there is no guarantee that I am popping off the correct one.

所以,我如何以并行方式遍历集合和出队。

So, how can I iterate through the collection and dequeue, in a parallel fashion.

或者,最好使用<$

Or, would it be better to use PLINQ to do this processing, in parallel?

推荐答案

100%肯定你试图归档这里。你试图去除所有项目,直到没有剩下的东西?

Well I'm not 100% sure what you try to archive here. Are you trying to just dequeue all items until nothing is left? Or just dequeue lots of items in one go?

第一个大概意外的行为开始于这个语句:

The first probably unexpected behavior starts with this statement:

 theQueue.AsParallel()

对于ConcurrentQueue, '快照' - Enumerator。所以当你迭代一个并发堆栈,你只需要遍历快照,没有'活'队列。

For a ConcurrentQueue, you get a 'Snapshot'-Enumerator. So when you iterate over a concurrent stack, you only iterate over the snapshot, no the 'live' queue.

一般来说,我认为这不是一个好主意迭代

In general I think it's not a good idea to iterate over something you're changing during the iteration.

因此,另一种解决方案如下所示:

So another solution would look like this:

        // this way it's more clear, that we only deque for theQueue.Count items
        // However after this, the queue is probably not empty
        // or maybe the queue is also empty earlier   
        Parallel.For(0, theQueue.Count,
                     new ParallelOptions() {MaxDegreeOfParallelism = 20},
                     () => { 
                         theQueue.TryDequeue(); //and stuff
                     });

这样可以避免在迭代时操作。然而,在该语句之后,队列仍然可以包含在for循环期间添加的数据。

This avoids manipulation something while iterating over it. However, after that statement, the queue can still contain data, which was added during the for-loop.

为了获得队列空的时间,你可能需要一个多一点工作。这是一个非常丑的解决方案。当队列仍有项目时,创建新任务。每个任务开始从队列中出队,只要它可以。最后,我们等待所有任务结束。为了限制并行性,我们从不创建超过20个任务。

To get the queue empty for moment in time you probably need a little more work. Here's an really ugly solution. While the queue has still items, create new tasks. Each task start do dequeue from the queue as long as it can. At the end, we wait for all tasks to end. To limit the parallelism, we never create more than 20-tasks.

        // Probably a kitty died because of this ugly code ;)
        // However, this code tries to get the queue empty in a very aggressive way
        Action consumeFromQueue = () =>
                                      {
                                          while (tt.TryDequeue())
                                          {
                                              ; // do your stuff
                                          }
                                      };
        var allRunningTasks = new Task[MaxParallism];
        for(int i=0;i<MaxParallism && tt.Count>0;i++)
        {
            allRunningTasks[i] = Task.Factory.StartNew(consumeFromQueue);  
        }
        Task.WaitAll(allRunningTasks);

这篇关于当使用ConcurrentQueue时,尝试在并行循环时出队的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆