在C#/.NET中为生产者/消费者实现异步流 [英] Implementing async stream for producer/cosumer in C# / .NET

查看:121
本文介绍了在C#/.NET中为生产者/消费者实现异步流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个lib将其结果输出到给定的Stream对象中.我想在lib完成之前开始使用结果. Stream应该被阻塞以简化用法,并在生产者超前运行时避免过多的内存消耗;线程安全以允许生产者和消费者独立存在.

There is a lib that outputs its results into a given Stream object. I would like to begin consuming the results before the lib is done. The Stream should be blocking to simplify usage and avoid excessive memory consumption if producer runs ahead too far; thread safe to allow independent existence of producer and consumer.

库完成后,生产者线程应关闭流,从而通知消费者没有更多数据.

Once the lib finishes, the producer thread should close the stream, hence notifying consumer that there is no more data.

我当时在考虑使用NetworkStream或PipeStream(匿名),但是它们都可能通过内核发送数据时速度很慢.

I was thinking of using NetworkStream or PipeStream (anonymous), but both are probably slow as they send data through kernel.

有什么建议吗?


var stream = new AsyncBlockingBufferedStream();

void ProduceData() { // In producer thread externalLib.GenerateData(stream); stream.Close(); }

void ProduceData() { // In producer thread externalLib.GenerateData(stream); stream.Close(); }

void ConsumeData() { // In consumer thread int read; while ((read = stream.Read(...)) != 0) { ... } }

void ConsumeData() { // In consumer thread int read; while ((read = stream.Read(...)) != 0) { ... } }

推荐答案

基于克里斯·泰勒(Chris Taylor)的先前回答,这是我自己进行的修改,具有更快的基于块的操作和更正的写入完成通知.它现在被标记为Wiki,因此您可以对其进行更改.

Based on the previous answer by Chris Taylor, here's my own, revised, with much faster block based operations and corrected write completion notifications. It's marked as wiki now, so you can change it.

public class BlockingStream : Stream
{
    private readonly BlockingCollection<byte[]> _blocks;
    private byte[] _currentBlock;
    private int _currentBlockIndex;

    public BlockingStream(int streamWriteCountCache)
    {
        _blocks = new BlockingCollection<byte[]>(streamWriteCountCache);
    }

    public override bool CanTimeout { get { return false; } }
    public override bool CanRead { get { return true; } }
    public override bool CanSeek { get { return false; } }
    public override bool CanWrite { get { return true; } }
    public override long Length { get { throw new NotSupportedException(); } }
    public override void Flush() {}
    public long TotalBytesWritten { get; private set; }
    public int WriteCount { get; private set; }

    public override long Position
    {
        get { throw new NotSupportedException(); }
        set { throw new NotSupportedException(); }
    }

    public override long Seek(long offset, SeekOrigin origin)
    {
        throw new NotSupportedException();
    }

    public override void SetLength(long value)
    {
        throw new NotSupportedException();
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        ValidateBufferArgs(buffer, offset, count);

        int bytesRead = 0;
        while (true)
        {
            if (_currentBlock != null)
            {
                int copy = Math.Min(count - bytesRead, _currentBlock.Length - _currentBlockIndex);
                Array.Copy(_currentBlock, _currentBlockIndex, buffer, offset + bytesRead, copy);
                _currentBlockIndex += copy;
                bytesRead += copy;

                if (_currentBlock.Length <= _currentBlockIndex)
                {
                    _currentBlock = null;
                    _currentBlockIndex = 0;
                }

                if (bytesRead == count)
                    return bytesRead;
            }

            if (!_blocks.TryTake(out _currentBlock, Timeout.Infinite))
                return bytesRead;
        }
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        ValidateBufferArgs(buffer, offset, count);

        var newBuf = new byte[count];
        Array.Copy(buffer, offset, newBuf, 0, count);
        _blocks.Add(newBuf);
        TotalBytesWritten += count;
        WriteCount++;
    }

    protected override void Dispose(bool disposing)
    {
        base.Dispose(disposing);
        if (disposing)
        {
            _blocks.Dispose();
        }
    }

    public override void Close()
    {
        CompleteWriting();
        base.Close();
    }

    public void CompleteWriting()
    {
        _blocks.CompleteAdding();
    }

    private static void ValidateBufferArgs(byte[] buffer, int offset, int count)
    {
        if (buffer == null)
            throw new ArgumentNullException("buffer");
        if (offset < 0)
            throw new ArgumentOutOfRangeException("offset");
        if (count < 0)
            throw new ArgumentOutOfRangeException("count");
        if (buffer.Length - offset < count)
            throw new ArgumentException("buffer.Length - offset < count");
    }
}

这篇关于在C#/.NET中为生产者/消费者实现异步流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆