与BlockingCollection集成时Parallel.ForEach停滞 [英] Parallel.ForEach stalled when integrated with BlockingCollection

查看:110
本文介绍了与BlockingCollection集成时Parallel.ForEach停滞的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我根据

这是测试代码

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100, 1);
    }

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,
                                              (i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

测试始终失败-它始终停留在被测试的100个项目中的第93个项目附近.知道我的代码的哪一部分导致了此问题,以及如何解决此问题?

The test always fails - it always stuck at around 93th-item from the 100 tested. Any idea which part of my code caused this issue, and how to fix it?

推荐答案

您不能将Parallel.Foreach()BlockingCollection.GetConsumingEnumerable()一起使用,就像您发现的那样.

You cannot use Parallel.Foreach() with BlockingCollection.GetConsumingEnumerable(), as you have discovered.

有关说明,请参阅此博客文章:

For an explanation, see this blog post:

http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

该博客还提供了名为GetConsumingPartitioner()的方法的源代码,可用于解决该问题.

That blog also provides the source code for a method called GetConsumingPartitioner() which you can use to solve the problem.

博客摘录:

BlockingCollection的GetConsumingEnumerable实现正在使用BlockingCollection的内部同步,该同步已同时支持多个使用者,但是ForEach并不知道,其可枚举分区逻辑在访问可枚举时也需要锁定.

BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently, but ForEach doesn’t know that, and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.

因此,此处的同步比实际所需的更多,从而导致性能损失可能微不足道.

As such, there’s more synchronization here than is actually necessary, resulting in a potentially non-negligable performance hit.

[也] Parallel.ForEach和PLINQ默认使用的分区算法使用分块以最大程度地降低同步成本:与其对每个元素获取一次锁,不如对每个元素获取锁,而是获取锁,获取一组元素(块),然后释放锁.

[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

尽管此设计可以帮助提高整体吞吐量,但对于更注重低延迟的方案,分块可能会令人望而却步.

While this design can help with overall throughput, for scenarios that are focused more on low latency, that chunking can be prohibitive.

这篇关于与BlockingCollection集成时Parallel.ForEach停滞的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆