从阅读的NetworkStream破坏了缓冲区 [英] Reading from NetworkStream corrupts the buffer

查看:159
本文介绍了从阅读的NetworkStream破坏了缓冲区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当从使用的NetworkStream读取ReadUntilClosedObservable1数据,返回的数据被损坏像读取数据重叠的部分区块。

然而,当我与ReadUntilClosedObservable2读取数据的数据到达没有问题。

我想使用的ReadUntilClosedObservable1因为流反复阅读ReadUntilClosedObservable2是燃烧的CPU。

我怎样才能获得的消息同步顺序?

更新:

 返回Observable.Timer(TimeSpan.Zero,间隔,TaskPoolScheduler.Default)
                    .SelectMany(_ => ReadToEnd的)
                    。凡(dataChunk = GT; dataChunk.Length大于0);

我只注意到 ReadToEnd的它完成了previous作业之前一次又一次被解雇。难道不是需要同步?如果 Observable.Timer 是我怎么能达到同样的效果没有它,阅读的时间间隔,但无需等待启动的问题?

 公共静态的IObservable< INT> ReadObservable(此流流,字节[]缓冲区
                                              ,诠释抵消,诠释计数)
{
    返回stream.ReadAsync(缓冲区,偏移,计数)
                    .ToObservable();
}公共静态的IObservable<字节[]> ReadObservable(此流流,
                                                 INT缓冲区大小)
{
    VAR缓冲=新的字节[缓冲区大小]    返回stream.ReadObservable(缓冲液,0,buffer.Length)
                    。选择(cbRead =>
                                {
                                    如果(cbRead == 0)
                                    {
                                        返回新的[0]字节;
                                    }                                    如果(cbRead == buffer.Length)
                                    {
                                        返回缓冲区;
                                    }                                    VAR dataChunk =新的字节[cbRead]                                    Buffer.BlockCopy(缓冲液,0,dataChunk,
                                                     0,cbRead);                                    返回dataChunk;
                                });
}公共静态的IObservable<字节[]> ReadUntilClosedObservable1(这的NetworkStream
                                     流缓冲区大小INT,时间跨度间隔)
{
    VAR为ReadToEnd = Observable.Defer(()=> stream.ReadObservable(缓冲区大小))
                                .DoWhile(()=> stream.DataAvailable)
                                .ToList()
                                。选择(dataChunks =>
                                    {
                                        VAR缓冲=新的List<位>();                                        的foreach(在dataChunks VAR dataChunk)
                                        {
                                            buffer.AddRange(dataChunk);
                                        }                                        返回buffer.ToArray();
                                    });    返回Observable.Timer(TimeSpan.Zero,间隔,TaskPoolScheduler.Default)
                        .SelectMany(_ => ReadToEnd的)
                        。凡(dataChunk = GT; dataChunk.Length大于0);
}公共静态的IObservable<字节[]> ReadUntilClosedObservable2(此流流
                                                             ,诠释缓冲区大小)
{
    返回Observable.Defer(()=> stream.ReadObservable(缓冲区大小))
                        。重复()
                        。凡(dataChunk = GT; dataChunk.Length大于0);
}


解决方案

哦,不,不......不要做这样的...

异步和RX是更多...的非直观的设置,以获得工作之一,但它比你尝试什么简单一些。关键位是三个不同的运营商的Rx:


  • FromAsyncPattern :从一个异步调用签名生成一个的IObservable 工厂

  • Observable.Defer :允许您使用上述的IObservable 工厂产生的每用户观测

  • Observable.While :让你重新调用,直到我说,当上的的IObservable

(编辑:改变使用的NetworkStream 为例)

(doubleEDIT:改变了基于评论)

试试这个 - 除非我想念我的猜测,它或多或少什么你想要的:

 无效的主要()
{
    //我们会养活这对听众
    VAR消息=哟妈妈说你喜欢这样的消息;    VAR listenerTask =任务
        。厂
        .StartNew(()=>
            {
                VAR缓冲区大小= 1024;
                VAR本地主机=新ip地址(新的byte [] {127,0,0,1});
                VAR监听器=新的TcpListener(本地主机,11201);
                listener.Start();
                变种incomingClient = listener.AcceptTcpClient();
                变种clientStream = incomingClient.GetStream();
                //我们的读者缓冲
                VAR观察者= clientStream.ReadObservable(缓冲区大小);
                VAR compareBuffer =观察者
                    //虽然我们获取数据和客户端以
                    //仍处于连接
                    .TakeWhile(returnBuffer = GT; returnBuffer.Length大于0&放大器;&放大器;
                        incomingClient.Connected)
                    //在读取块之间,响应返回给客户端
                    //无需花哨这里,只是正常的回写异步
                    。做(returnBuffer => clientStream.BeginWrite(
                             returnBuffer,
                             0,
                             returnBuffer.Length,
                             AR => clientStream.EndWrite(AR),
                             空值))
                    .ToEnumerable()
                    .SelectMany(returnBuffer => returnBuffer)
                    .ToArray();
                listener.Stop();
                Console.WriteLine(
                     监听器认为它被告知... {0},
                     Encoding.ASCII.GetString(compareBuffer));
            });    VAR clientTask = Task.Factory.StartNew(
        ()=>
        {
            VAR的客户=新的TcpClient();
            client.Connect(本地主机,11201);
            VAR随机=新的随机();
            变种outStream = client.GetStream();
            VAR bytesToSend = Encoding.ASCII.GetBytes(消息);
            的foreach(在bytesToSend字节toSend)
            {
                //在发送一个字符
                outStream.WriteByte(toSend);                //监听器应该人云亦云我们...
                INT打手= outStream.ReadByte();
                如果(呆子!= toSend)
                {
                    Console.WriteLine(
                        咦监听呼应错了,我说:{0},他们说,{1},
                         发送,
                         继续);
                    打破;
                }
                Console.WriteLine(我说:{0},他们说,{1},toSend,呆子);                //以一个小盹(模拟延迟等)
                Thread.sleep代码(random.Next(200));
            }
            client.Close();
        });    Task.WaitAll(listenerTask,clientTask);
}
公共静态类分机
{
    公共静态的IObservable<字节[]> ReadObservable(此流流,INT缓冲区大小)
    {
        //保存读取数据
        VAR缓冲=新的字节[缓冲区大小]
        //第1步:异步签名=>观察到厂
        VAR异步读取= Observable.FromAsyncPattern<字节[],INT,INT,INT>(
            stream.BeginRead,
            stream.EndRead);
        返回Observable.While(
            //而存在要读出的数据
            ()=> stream.CanRead,
            //反复调用观察到的工厂,这将
            //重建以使其将从目前开始
            //流位置 - 因此0偏移
            Observable.Defer(()=>异步读取(缓冲液,0,缓冲区大小))
                。选择(的ReadBytes = GT; buffer.Take(的ReadBytes).ToArray()));
    }
}

When reading data from NetworkStream with ReadUntilClosedObservable1, the returned data is corrupted like some blocks of read data overlap.

However, when I read the data with ReadUntilClosedObservable2 the data arrives without problems.

I want to use the ReadUntilClosedObservable1 because repeatedly reading from stream in ReadUntilClosedObservable2 is burning the CPU.

How can I get the messages in sync order?

UPDATE:

return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
                    .SelectMany(_ => readToEnd)
                    .Where(dataChunk => dataChunk.Length > 0);

I just noticed that readToEnd to is fired again and again before it finishes the previous job. Doesn't it need to be synchronized? If Observable.Timer is the problem how can I achieve the same effect without it, reading in intervals but starting without waiting?

public static IObservable<int> ReadObservable(this Stream stream, byte[] buffer
                                              ,int offset, int count)
{
    return stream.ReadAsync(buffer, offset, count)
                    .ToObservable();
}

public static IObservable<byte[]> ReadObservable(this Stream stream,
                                                 int bufferSize)
{
    var buffer = new byte[bufferSize];

    return stream.ReadObservable(buffer, 0, buffer.Length)
                    .Select(cbRead =>
                                {
                                    if (cbRead == 0)
                                    {
                                        return new byte[0];
                                    }

                                    if (cbRead == buffer.Length)
                                    {
                                        return buffer;
                                    }

                                    var dataChunk = new byte[cbRead];

                                    Buffer.BlockCopy(buffer, 0, dataChunk,
                                                     0, cbRead);

                                    return dataChunk;
                                });
}

public static IObservable<byte[]> ReadUntilClosedObservable1(this NetworkStream
                                     stream, int bufferSize, TimeSpan interval)
{
    var readToEnd = Observable.Defer(() => stream.ReadObservable(bufferSize))
                                .DoWhile(() => stream.DataAvailable)
                                .ToList()
                                .Select(dataChunks =>
                                    {
                                        var buffer = new List<byte>();

                                        foreach (var dataChunk in dataChunks)
                                        {
                                            buffer.AddRange(dataChunk);
                                        }

                                        return buffer.ToArray();
                                    });

    return Observable.Timer(TimeSpan.Zero, interval, TaskPoolScheduler.Default)
                        .SelectMany(_ => readToEnd)
                        .Where(dataChunk => dataChunk.Length > 0);
}

public static IObservable<byte[]> ReadUntilClosedObservable2(this Stream stream
                                                             ,int bufferSize)
{
    return Observable.Defer(() => stream.ReadObservable(bufferSize))
                        .Repeat()
                        .Where(dataChunk => dataChunk.Length > 0);
}

解决方案

Oh, no, no...don't do it like that...

Async + Rx is one of the more...non-intuitive setups to get working, but it is quite a bit simpler than what you're attempting. The key bits are three different Rx operators:

  • FromAsyncPattern: generates a "IObservable Factory" from an async call signature
  • Observable.Defer: allows you to use above IObservable factory to generate observables per subscriber
  • Observable.While: allows you to "Reinvoke until I say when" on an IObservable

(EDIT: altered to use a NetworkStream example)

(doubleEDIT: altered based on comments)

Try this - unless I miss my guess, it's more or less what you're trying for:

void Main()
{    
    // We'll feed this to listener
    var message = "Yo mamma said you like messages like this";

    var listenerTask = Task
        .Factory
        .StartNew(() => 
            {
                var bufferSize = 1024;
                var localhost = new IPAddress(new byte[]{127,0,0,1});
                var listener = new TcpListener(localhost, 11201);
                listener.Start();
                var incomingClient = listener.AcceptTcpClient();
                var clientStream = incomingClient.GetStream();
                // our buffered reader
                var observer = clientStream.ReadObservable(bufferSize);
                var compareBuffer = observer
                    // Take while we're getting data and the client
                    // is still connected
                    .TakeWhile(returnBuffer => returnBuffer.Length > 0 && 
                        incomingClient.Connected)
                    // In between read blocks, respond back to the client
                    // No need for fanciness here, just normal async writeback
                    .Do(returnBuffer => clientStream.BeginWrite(
                             returnBuffer, 
                             0, 
                             returnBuffer.Length, 
                             ar => clientStream.EndWrite(ar), 
                             null))
                    .ToEnumerable()
                    .SelectMany (returnBuffer => returnBuffer)
                    .ToArray();
                listener.Stop();
                Console.WriteLine(
                     "Listener thinks it was told... {0}", 
                     Encoding.ASCII.GetString(compareBuffer));
            });

    var clientTask = Task.Factory.StartNew(
        () => 
        {
            var client = new TcpClient();
            client.Connect("localhost", 11201);
            var random = new Random();
            var outStream = client.GetStream();
            var bytesToSend = Encoding.ASCII.GetBytes(message);
            foreach(byte toSend in bytesToSend)
            {
                // send a character over
                outStream.WriteByte(toSend);

                // Listener should parrot us...
                int goOn = outStream.ReadByte();
                if(goOn != toSend)
                {
                    Console.WriteLine(
                        "Huh. Listener echoed wrong. I said: {0}, they said {1}", 
                         toSend, 
                         goOn);
                    break;
                }
                Console.WriteLine("I said: {0}, they said {1}", toSend, goOn);

                // Take a little nap (simulate latency, etc)
                Thread.Sleep(random.Next(200));
            }
            client.Close();
        });

    Task.WaitAll(listenerTask, clientTask);
}


public static class Ext
{        
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
    {        
        // to hold read data
        var buffer = new byte[bufferSize];
        // Step 1: async signature => observable factory
        var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
            stream.BeginRead, 
            stream.EndRead);
        return Observable.While(
            // while there is data to be read
            () => stream.CanRead, 
            // iteratively invoke the observable factory, which will
            // "recreate" it such that it will start from the current
            // stream position - hence "0" for offset
            Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
                .Select(readBytes => buffer.Take(readBytes).ToArray()));
    }
}

这篇关于从阅读的NetworkStream破坏了缓冲区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆