消费自定义流(IEnumerable< T>) [英] Consuming a custom stream (IEnumerable<T>)

查看:69
本文介绍了消费自定义流(IEnumerable< T>)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Stream的自定义实现,该实现会将IEnumerable<T>流式传输到流中.我正在使用 EnumerableStream 实现来执行转换.

I'm using a custom implementation of a Stream that will stream an IEnumerable<T> into a stream. I'm using this EnumerableStream implementation to perform the conversion.

我正在使用它在流模式下通过WCF执行流.我可以将IEnumerable转换为流而没有问题.一次,我在客户端,我可以反序列化并获取所有数据,但是我找不到停止循环流的条件.我得到了:

I'm using it to perform streaming over WCF in streaming mode. I'm able to convert the IEnumerable to a stream without problem. Once, I'm in the client side, I can deserialize and get all the data, however I'm not able to find the condition to stop looping over my stream. I'm getting:

System.Runtime.Serialization.SerializationException:在解析完成之前遇到流的结尾.

System.Runtime.Serialization.SerializationException: End of Stream encountered before parsing was completed.

以下是我要实现的目标示例:

Here's sample example of what I'm trying to achieve:

class Program
{
    public static void Main()
    {
        var ListToSend = new List<List<string>>();
        var ListToReceive = new List<List<string>>();
        ListToSend = SimulateData().ToList();
        using (Stream stream = GetStream(ListToSend))
        {
            var formatter = new BinaryFormatter();
            while (stream.CanRead || 1 == 1 || true...) // What should I put in here to stop once I read everything???
            {
                List<string> row = formatter.Deserialize(stream) as List<string>;
                ListToReceive.Add(row);
            }
            Printer(ListToReceive);
            Console.WriteLine("Done");
        }
    }

    private static void Printer(List<List<string>> data)
    {
        Console.WriteLine("Printing");
        foreach (var row in data)
        {
            foreach (var cell in row)
            {
                Console.Write(cell + "\t");
            }
            Console.WriteLine("-------------------------------------------------------------------------------");
        }
    }
    private static Stream GetStream(IEnumerable<List<string>> data)
    {
        return EnumerableStream.Create(data, DeserializerCallback);
    }

    private static List<byte> DeserializerCallback(object obj)
    {
        var binFormatter = new BinaryFormatter();
        var mStream = new MemoryStream();
        binFormatter.Serialize(mStream, obj);
        return mStream.ToArray().ToList();
    }

    private static IEnumerable<List<string>> SimulateData()
    {
        Random randomizer = new Random();
        for (var i = 0; i < 10; i++)
        {
            var row = new List<string>();
            for (var j = 0; j < 1000; j++)
            {
                row.Add((randomizer.Next(100)).ToString());
            }
            yield return row;
        }
    }
}

我未包含自定义流.我为想要查看完整代码的用户创建了小提琴.

I did not include the custom stream. I created a fiddle for those that want to see the entire code.

  • 我是否需要在自定义流本身中添加一些内容以通知已读取所有数据?
  • 是因为解串器和序列化器的格式不一样(我认为不是).
  • 我还想知道为什么在读取函数中设置断点时,缓冲区大小会随机变化.
  • 我不希望通过尝试捕获来包装代码,我想要一个不会崩溃的干净解决方案.

推荐答案

我是否需要在自定义流本身中添加一些内容以通知已读取所有数据?

Do I need to add something in the custom stream itself to notify that all the data have been read?

您可以,但是在接收到的Stream是不同类的WCF方案中无济于事.

You can, but that wouldn't help in the WCF scenario where the received Stream is a different class.

确定Stream数据结束的方法有两种标准(官方的,通过设计):

There are two standard (official, by design) ways of determining the end of the Stream data:

(1) ReadByte 返回-1

返回

无符号字节强制转换为Int32,如果在流末尾则为-1.

The unsigned byte cast to an Int32, or -1 if at the end of the stream.

(2) Read count > 0

返回

读入缓冲区的字节总数.如果当前没有太多字节,则该数目可以小于请求的字节数;如果已到达流的末尾,则该数目可以为零(0).

The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero (0) if the end of the stream has been reached.

不幸的是,它们都消耗了当前字节(前进到下一个字节),并且会破坏反序列化器.

Unfortunately both they consume the current byte (advance to next) and will break the deserializer.

可能的解决方案是什么?

What are the possible solutions?

首先,实现一些序列化/反序列化格式(协议),该格式使您可以知道是否还有更多要反序列化的元素.例如,List<T>在元素之前存储CountT[]在元素之前存储Length等.由于EnumerableStream<T>不预先知道计数,一个简单的解决方案是在每个元素之前发出一个假字节.元素:

First, implementing some serialization/deserialization format (protocol) which allows you to know if there are more elements to deserialize. for instance, List<T> stores Count before elements, T[] stores Length before elements etc. Since the EnumerableStream<T> does not know the count in advance, one simple solution would be to emit a single fake byte before each element:

private bool SerializeNext()
{
    if (!_source.MoveNext())
        return false;

    buf.Enqueue(1); // <--
    foreach (var b in _serializer(_source.Current))
        _buf.Enqueue(b);

    return true;
}

这将允许您使用

while (stream.ReadByte() != -1)
{
    // ...
}

第二,如果要保留当前格式,则更通用的解决方案是实现自定义流,该流将包装另一个流并以与标准ReadByte相同的语义实现PeekByte方法,但不消耗当前字节:

Second, if you want to keep the current format, a more general solution would be to implement a custom stream, which wraps another stream and implements PeekByte method with the same semantics as the standard ReadByte, but without consuming the current byte:

public class SequentialStream : Stream
{
    private Stream source;
    private bool leaveOpen;
    private int? nextByte;

    public SequentialStream(Stream source, bool leaveOpen = false)
    {
        if (source == null) throw new ArgumentNullException(nameof(source));
        if (!source.CanRead) throw new ArgumentException("Non readable source.", nameof(source));
        this.source = source;
        this.leaveOpen = leaveOpen;
    }

    protected override void Dispose(bool disposing)
    {
        if (disposing && !leaveOpen)
            source.Dispose();
        base.Dispose(disposing);
    }

    public override bool CanRead => true;
    public override bool CanSeek => false;
    public override bool CanWrite => false;
    public override long Length => throw new NotSupportedException();
    public override long Position { get => throw new NotSupportedException(); set => throw new NotSupportedException(); }
    public override void Flush() { }
    public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
    public override void SetLength(long value) => throw new NotSupportedException();
    public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

    public int PeekByte()
    {
        if (nextByte == null)
            nextByte = source.ReadByte();
        return nextByte.Value;
    }

    public override int Read(byte[] buffer, int offset, int count)
    {
        if (count <= 0) return 0;
        if (nextByte != null)
        {
            if (nextByte.Value < 0) return 0;
            buffer[offset] = (byte)nextByte.Value;
            if (count > 1)
            {
                int read = source.Read(buffer, offset + 1, count - 1);
                if (read == 0)
                    nextByte = -1;
                else
                    nextByte = null;
                return read + 1;
            }
            else
            {
                nextByte = null;
                return 1;
            }
        }
        else
        {
            int read = source.Read(buffer, offset, count);
            if (read == 0)
                nextByte = -1;
            return read;
        }
    }
} 

这基本上实现了具有0或1字节预读功能的只读转发流.

This basically implements read only forward only stream with 0 or 1 byte read ahead functionality.

用法如下:

using (var stream = new SequentialStream(GetStream(ListToSend)))
{
    // ...
    while (stream.PeekByte() != -1) 
    {
        // ...
    }
    // ...
}

P.S.那

我还想知道为什么当我在读取函数中放置一个断点时,缓冲区大小随机变化.

I also want to know why when I put a break point in the read function, the buffer size is changing randomly.

不是随机的. BinaryFormatter在内部使用BinaryReader读取诸如Int32ByteString等的键入的值,将所需的大小传递为count,例如4,1,字符串编码的字节数(之所以知道,是因为在实际数据存储之前将它们存储在流中,并在尝试读取实际数据之前将其读取)等.

It's not randomly. BinaryFormatter internally uses BinaryReader to read typed values like Int32, Byte, String etc., passing the desired size as count, e.g. 4, 1, number of the string encoded bytes (which it knows because stores them in the stream before actual the data and reads it before trying to read the actual data) etc.

这篇关于消费自定义流(IEnumerable&lt; T&gt;)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆