Parallel.ForEach陷入僵局时BlockingCollection integarated [英] Parallel.ForEach stalled when integarated with BlockingCollection

查看:175
本文介绍了Parallel.ForEach陷入僵局时BlockingCollection integarated的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我通过基于code。在<我的一个执行并行/消费者href="http://$c$creview.stackexchange.com/questions/8972/is-this-a-valid-producer-consumer-implementation-for-parallel-processing-in-th">this问题

I adopted my implementation of parallel/consumer based on the code in this question

class ParallelConsumer<T> : IDisposable
{
    private readonly int _maxParallel;
    private readonly Action<T> _action;
    private readonly TaskFactory _factory = new TaskFactory();
    private CancellationTokenSource _tokenSource;
    private readonly BlockingCollection<T> _entries = new BlockingCollection<T>();
    private Task _task;

    public ParallelConsumer(int maxParallel, Action<T> action)
    {
        _maxParallel = maxParallel;
        _action = action;
    }

    public void Start()
    {
        try
        {
            _tokenSource = new CancellationTokenSource();
            _task = _factory.StartNew(
                () =>
                {
                    Parallel.ForEach(
                        _entries.GetConsumingEnumerable(),
                        new ParallelOptions { MaxDegreeOfParallelism = _maxParallel, CancellationToken = _tokenSource.Token },
                        (item, loopState) =>
                        {
                            Log("Taking" + item);
                            if (!_tokenSource.IsCancellationRequested)
                            {
                                _action(item);
                                Log("Finished" + item);
                            }
                            else
                            {
                                Log("Not Taking" + item);
                                _entries.CompleteAdding();
                                loopState.Stop();
                            }
                        });
                },
                _tokenSource.Token);
        }
        catch (OperationCanceledException oce)
        {
            System.Diagnostics.Debug.WriteLine(oce);
        }
    }

    private void Log(string message)
    {
        Console.WriteLine(message);
    }

    public void Stop()
    {
        Dispose();
    }

    public void Enqueue(T entry)
    {
        Log("Enqueuing" + entry);
        _entries.Add(entry);
    }

    public void Dispose()
    {
        if (_task == null)
        {
            return;
        }

        _tokenSource.Cancel();
        while (!_task.IsCanceled)
        {
        }

        _task.Dispose();
        _tokenSource.Dispose();
        _task = null;
    }
}

这里是一个考验code

And here is a test code

class Program
{
    static void Main(string[] args)
    {
        TestRepeatedEnqueue(100, 1);
    }

    private static void TestRepeatedEnqueue(int itemCount, int parallelCount)
    {
        bool[] flags = new bool[itemCount];
        var consumer = new ParallelConsumer<int>(parallelCount,
                                              (i) =>
                                              {
                                                  flags[i] = true;
                                              }
            );
        consumer.Start();
        for (int i = 0; i < itemCount; i++)
        {
            consumer.Enqueue(i);
        }
        Thread.Sleep(1000);
        Debug.Assert(flags.All(b => b == true));



    }
}

测试总是失败 - 它总是停留在围绕93th项目从测试的100。其中我的code部分原因这个问题,以及如何解决它?任何想法

The test always fails - it always stuck at around 93th-item from the 100 tested. Any idea which part of my code caused this issue, and how to fix it?

推荐答案

您无法使用 Parallel.Foreach() BlockingCollection.GetConsumingEnumerable() ,因为你已经发现了。

You cannot use Parallel.Foreach() with BlockingCollection.GetConsumingEnumerable(), as you have discovered.

有关说明,请参阅这篇博客文章:

For an explanation, see this blog post:

<一个href="http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx">http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

这是博客中还提供了源$ C ​​$ C的调用方法 GetConsumingPartitioner(),你可以用它来解决这个问题。

That blog also provides the source code for a method called GetConsumingPartitioner() which you can use to solve the problem.

从博客摘录:

BlockingCollection的GetConsumingEnumerable实现使用它已经支持多个用户同时BlockingCollection的内部同步,但的ForEach不知道,它的枚举分区的逻辑也需要采取锁在访问枚举。

BlockingCollection’s GetConsumingEnumerable implementation is using BlockingCollection’s internal synchronization which already supports multiple consumers concurrently, but ForEach doesn’t know that, and its enumerable-partitioning logic also needs to take a lock while accessing the enumerable.

因此​​,有更多的同步这里比实际需要的,从而导致潜在的非可忽略的性能损失。

As such, there’s more synchronization here than is actually necessary, resulting in a potentially non-negligable performance hit.

[此外]在默认情况下受聘于两个Parallel.ForEach和PLINQ使用分块,以尽量减少同步成本划分算法:不是每个元素以锁一次,它会采取锁,抢一组元素(一大块),然后释放锁。

[Also] the partitioning algorithm employed by default by both Parallel.ForEach and PLINQ use chunking in order to minimize synchronization costs: rather than taking the lock once per element, it'll take the lock, grab a group of elements (a chunk), and then release the lock.

虽然这种设计可以帮助整体吞吐量,对于那些更注重低延迟,即分块能让人望而却步的情景。

While this design can help with overall throughput, for scenarios that are focused more on low latency, that chunking can be prohibitive.

这篇关于Parallel.ForEach陷入僵局时BlockingCollection integarated的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆