如何加快矮胖的BlockingCollection实现 [英] How to speed up a chunky BlockingCollection implementation

查看:44
本文介绍了如何加快矮胖的BlockingCollection实现的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经多次使用 BlockingCollection 用于实现生产者/消费者模式,但是由于相关的开销,我在使用极其细粒度的数据时遇到了糟糕的性能.这通常迫使我通过对数据进行分块/分区来即兴创作,换句话说,是使用 BlockingCollection< T []> 而不是 BlockingCollection< T> .这是最新示例.这是可行的,但是很丑陋且容易出错.我最终在生产者和使用者上都使用了嵌套循环,我必须记住 Add 在生产者工作负载结束时剩下的内容.因此,我想到了实现一个大块的 BlockingCollection 的想法,该代码可以内部处理所有这些复杂性,并且可以将现有的 BlockingCollection 的简单接口外部化.我的问题是我还没有设法匹配复杂的手动分区的性能.对于极细粒度的数据(基本上只是整数值),我仍会尽最大努力支付大约100%的性能税.因此,我想在这里介绍我到目前为止所做的事情,希望能提供一些建议,以帮助我缩小性能差距.

I have used many times the BlockingCollection for implementing the producer/consumer pattern, but I have experienced bad performance with extremely granular data because of the associated overhead. This usually forces me to improvise by chunkifying/partitioning my data, in other words using a BlockingCollection<T[]> instead of BlockingCollection<T>. Here is a resent example. This works but it's ugly and error-prone. I end up using nested loops at both the producer and the consumer, and I must remember to Add what is left at the end of a producer's workload. So I had the idea of implementing a chunky BlockingCollection, that will handle all these complications internally, and will externalize the same simple interface with the existing BlockingCollection. My problem is that I haven't managed yet to match the performance of the complex manual partitioning. My best attempt still pays a performance tax of around +100%, for extremely granular data (basically just integer values). So I would like to present here what I have done so far, hoping for an advice that will help me close the performance gap.

我最好的尝试是使用 ThreadLocal< List< List< T>>> ,这样每个线程都可以在专用块上工作,而无需任何锁.

My best attempt is using a ThreadLocal<List<T>>, so that each thread works on a dedicated chunk, removing any need for locks.

public class ChunkyBlockingCollection1<T>
{
    private readonly BlockingCollection<T[]> _blockingCollection;
    public readonly int _chunkSize;
    private readonly ThreadLocal<List<T>> _chunk;

    public ChunkyBlockingCollection1(int chunkSize)
    {
        _blockingCollection = new BlockingCollection<T[]>();
        _chunkSize = chunkSize;
        _chunk = new ThreadLocal<List<T>>(() => new List<T>(chunkSize), true);
    }

    public void Add(T item)
    {
        var chunk = _chunk.Value;
        chunk.Add(item);
        if (chunk.Count >= _chunkSize)
        {
            _blockingCollection.Add(chunk.ToArray());
            chunk.Clear();
        }
    }

    public void CompleteAdding()
    {
        var chunks = _chunk.Values.ToArray();
        foreach (var chunk in chunks)
        {
            _blockingCollection.Add(chunk.ToArray());
            chunk.Clear();
        }
        _blockingCollection.CompleteAdding();
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
        {
            for (int i = 0; i < chunk.Length; i++)
            {
                yield return chunk[i];
            }
        }
    }
}

我的第二个最佳尝试是使用单个 List< T> 作为块,所有线程都使用锁以线程安全的方式访问所有 List< T> .令人惊讶的是,这仅比 ThreadLocal< List< T>> 解决方案慢一点.

My second best attempt is using a single List<T> as chunk, that is accessed by all threads in a thread safe manner using a lock. Surprisingly this is only slightly slower than the ThreadLocal<List<T>> solution.

public class ChunkyBlockingCollection2<T>
{
    private readonly BlockingCollection<T[]> _blockingCollection;
    public readonly int _chunkSize;
    private readonly List<T> _chunk;
    private readonly object _locker = new object();

    public ChunkyBlockingCollection2(int chunkSize)
    {
        _blockingCollection = new BlockingCollection<T[]>();
        _chunkSize = chunkSize;
        _chunk = new List<T>(chunkSize);
    }

    public void Add(T item)
    {
        lock (_locker)
        {
            _chunk.Add(item);
            if (_chunk.Count >= _chunkSize)
            {
                _blockingCollection.Add(_chunk.ToArray());
                _chunk.Clear();
            }
        }
    }

    public void CompleteAdding()
    {
        lock (_locker)
        {
            _blockingCollection.Add(_chunk.ToArray());
            _chunk.Clear();
        }
        _blockingCollection.CompleteAdding();
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var chunk in _blockingCollection.GetConsumingEnumerable())
        {
            for (int i = 0; i < chunk.Length; i++)
            {
                yield return chunk[i];
            }
        }
    }
}

我还尝试将 ConcurrentBag< T> 用作块,这会导致性能下降和正确性问题(因为我没有使用锁).另一尝试是将 lock(_locker)替换为

I have also tried to use as chunk a ConcurrentBag<T>, which resulted in bad performance and an issue with correctness (because I didn't use a lock). Another attempt was replacing the lock (_locker) with a SpinLock, with even worse performance. The locking is clearly the root of my problems, because if I remove it completely then my class obtains optimal performance. Of course removing the lock fails miserably with more than one producers.

更新:我实施了建议的无锁解决方案.https://stackoverflow.com/users/383426/nick>尼克,大量使用了

Update: I implemented the lock-free solution suggested by Nick, making heavy use of the Interlocked class. In a configuration with one producer the performance is slightly better, but becomes much worse with two or more producers. There are inconsistently many collisions that cause the threads to spin. The implementation is also very tricky, making bugs easy to introduce.

public class ChunkyBlockingCollection3<T>
{
    private readonly BlockingCollection<(T[], int)> _blockingCollection;
    public readonly int _chunkSize;
    private T[] _array;
    private int _arrayCount;
    private int _arrayCountOfCompleted;
    private T[] _emptyArray;

    public ChunkyBlockingCollection3(int chunkSize)
    {
        _chunkSize = chunkSize;
        _blockingCollection = new BlockingCollection<(T[], int)>();
        _array = new T[chunkSize];
        _arrayCount = 0;
        _arrayCountOfCompleted = 0;
        _emptyArray = new T[chunkSize];
    }

    public void Add(T item)
    {
        while (true) // Spin
        {
            int count = _arrayCount;
            while (true) // Spin
            {
                int previous = count;
                count++;
                int result = Interlocked.CompareExchange(ref _arrayCount,
                    count, previous);
                if (result == previous) break;
                count = result;
            }
            var array = Interlocked.CompareExchange(ref _array, null, null);
            if (array == null) throw new InvalidOperationException(
                    "The collection has been marked as complete.");
            if (count <= _chunkSize)
            {
                // There is empty space in the array
                array[count - 1] = item;
                Interlocked.Increment(ref _arrayCountOfCompleted);
                break; // Adding is completed
            }
            if (count == _chunkSize + 1)
            {
                // Array is full. Push it to the BlockingCollection.
                while (Interlocked.CompareExchange(
                    ref _arrayCountOfCompleted, 0, 0) < _chunkSize) { } // Spin
                _blockingCollection.Add((array, _chunkSize));
                T[] newArray;
                while ((newArray = Interlocked.CompareExchange(
                    ref _emptyArray, null, null)) == null) { } // Spin
                Interlocked.Exchange(ref _array, newArray);
                Interlocked.Exchange(ref _emptyArray, null);
                Interlocked.Exchange(ref _arrayCountOfCompleted, 0);
                Interlocked.Exchange(ref _arrayCount, 0); // Unlock other threads
                Interlocked.Exchange(ref _emptyArray, new T[_chunkSize]);
            }
            else
            {
                // Wait other thread to replace the full array with a new one.
                while (Interlocked.CompareExchange(
                    ref _arrayCount, 0, 0) > _chunkSize) { } // Spin
            }
        }
    }

    public void CompleteAdding()
    {
        var array = Interlocked.Exchange(ref _array, null);
        if (array != null)
        {
            int count = Interlocked.Exchange(ref _arrayCount, -1);
            while (Interlocked.CompareExchange(
                ref _arrayCountOfCompleted, 0, 0) < count) { } // Spin
            _blockingCollection.Add((array, count));
            _blockingCollection.CompleteAdding();
        }
    }

    public IEnumerable<T> GetConsumingEnumerable()
    {
        foreach (var (array, count) in _blockingCollection.GetConsumingEnumerable())
        {
            for (int i = 0; i < count; i++)
            {
                yield return array[i];
            }
        }
    }
}

推荐答案

您可以尝试使用 _chunk 的数组,而不要使用 List< T> .然后,您可以使用Interlocked.Increment递增下一个索引,以填充 Add 上的数据,当计数超过块的大小时,将其全部移至阻塞集合并在锁中重置索引,当然.

You can try with an array for _chunk instead of using List<T>. Then, you can use Interlocked.Increment to increment the next index to populate on Add and when your count exceeds the size of your chunk, move it all to the blocking collection and reset the index in a lock, of course.

这篇关于如何加快矮胖的BlockingCollection实现的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆