处理基于线路的网络 I/O 流的好方法是什么? [英] What is a good method to handle line based network I/O streams?

查看:16
本文介绍了处理基于线路的网络 I/O 流的好方法是什么?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注意:让我为这个问题的长度表示歉意,我不得不在其中投入大量信息.我希望这不会导致太多人简单地略读并做出假设.请完整阅读.谢谢.

Note: Let me appologize for the length of this question, i had to put a lot of information into it. I hope that doesn't cause too many people to simply skim it and make assumptions. Please read in its entirety. Thanks.

我有一个通过套接字传入的数据流.该数据是面向行的.

I have a stream of data coming in over a socket. This data is line oriented.

我正在使用 .NET(BeginRead 等)的 APM(异步编程方法).这排除了使用基于流的 I/O,因为异步 I/O 是基于缓冲区的.可以重新打包数据并将其发送到流,例如内存流,但也存在问题.

I am using the APM (Async Programming Method) of .NET (BeginRead, etc..). This precludes using stream based I/O because Async I/O is buffer based. It is possible to repackage the data and send it to a stream, such as a Memory stream, but there are issues there as well.

问题是我的输入流(我无法控制)没有给我任何关于流有多长的信息.它只是一个换行流,看起来像这样:

The problem is that my input stream (which i have no control over) doesn't give me any information on how long the stream is. It simply is a stream of newline lines looking like this:

COMMAND

...Unpredictable number of lines of data...

END COMMAND

....repeat....

所以,使用 APM,并且由于我不知道任何给定数据集的长度,数据块可能会跨越需要多次读取的缓冲区边界,但这些多次读取也将跨越多个数据块.

So, using APM, and since i don't know how long any given data set will be, it is likely that blocks of data will cross buffer boundaries requiring multiple reads, but those multiple reads will also span multiple blocks of data.

例子:

Byte buffer[1024] = ".................blah
This is another l"
[another read]
                    "ine
.............................More Lines..."

我的第一个想法是使用 StringBuilder 并将缓冲区行简单地附加到 SB.这在某种程度上有效,但我发现很难提取数据块.我尝试使用 StringReader 读取换行数据,但无法知道您是否获得了完整的行,因为 StringReader 在添加的最后一个块的末尾返回部分行,然后返回 null .没有办法知道返回的是否是完整的换行数据行.

My first thought was to use a StringBuilder and simply append the buffer lines to the SB. This works to some extent, but i found it difficult to extract blocks of data. I tried using a StringReader to read newlined data but there was no way to know whether you were getting a complete line or not, as StringReader returns a partial line at the end of the last block added, followed by returning null aftewards. There isn't a way to know if what was returned was a full newlined line of data.

例子:

// Note: no newline at the end
StringBuilder sb = new StringBuilder("This is a line
This is incomp..");
StringReader sr = new StringReader(sb);
string s = sr.ReadLine(); // returns "This is a line"
s = sr.ReadLine();        // returns "This is incomp.."

更糟糕的是,如果我只是继续追加数据,缓冲区会变得越来越大,而且由于这可能一次运行数周或数月,这不是一个好的解决方案.

What's worse, is that if I just keep appending to the data, the buffers get bigger and bigger, and since this could run for weeks or months at a time that's not a good solution.

我的下一个想法是在读取数据块时从 SB 中删除它们.这需要编写我自己的 ReadLine 函数,但后来我在读写过程中卡住了数据.此外,较大的数据块(可能包含数百个读取和数兆字节的数据)需要扫描整个缓冲区以查找换行符.它效率不高,而且非常丑陋.

My next thought was to remove blocks of data from the SB as I read them. This required writing my own ReadLine function, but then I'm stuck locking the data during reads and writes. Also, the larger blocks of data (which can consist of hundreds of reads and megabytes of data) require scanning the entire buffer looking for newlines. It's not efficient and pretty ugly.

我正在寻找具有 StreamReader/Writer 的简单性和异步 I/O 的便利性的东西.

I'm looking for something that has the simplicity of a StreamReader/Writer with the convenience of async I/O.

我的下一个想法是使用 MemoryStream,并将数据块写入内存流,然后将 StreamReader 附加到流并使用 ReadLine,但我再次遇到问题,不知道缓冲区中的最后一次读取是否是无论是否完整,而且从流中删除陈旧"数据更加困难.

My next thought was to use a MemoryStream, and write the blocks of data to a memory stream then attach a StreamReader to the stream and use ReadLine, but again I have issues with knowing if a the last read in the buffer is a complete line or not, plus it's even harder to remove the "stale" data from the stream.

我还考虑过使用同步读取的线程.这样做的好处是,使用 StreamReader,它将始终从 ReadLine() 返回整行,除非在连接断开的情况下.但是,这会导致取消连接的问题,并且某些类型的网络问题可能会导致长时间挂起阻塞套接字.我正在使用异步 IO,因为我不想在程序阻塞数据接收的整个生命周期中占用一个线程.

I also thought about using a thread with synchronous reads. This has the advantage that using a StreamReader, it will always return a full line from a ReadLine(), except in broken connection situations. However this has issues with canceling the connection, and certain kinds of network problems can result in hung blocking sockets for an extended period of time. I'm using async IO because i don't want to tie up a thread for the life of the program blocking on data receive.

连接持久.随着时间的推移,数据将继续流动.在初始连接期间,有大量数据流,一旦该流完成,套接字保持打开状态等待实时更新.我不确切知道初始流程何时完成",因为唯一知道的方法是不会立即发送更多数据.这意味着在处理之前我不能等待初始数据加载完成,当它进入时,我几乎被卡在实时"处理中.

The connection is long lasting. And data will continue to flow over time. During the intial connection, there is a large flow of data, and once that flow is done the socket remains open waiting for real-time updates. I don't know precisely when the initial flow has "finished", since the only way to know is that no more data is sent right away. This means i can't wait for the initial data load to finish before processing, I'm pretty much stuck processing "in real time" as it comes in.

那么,任何人都可以提出一种不太复杂的方式来处理这种情况的好方法吗?我真的希望它尽可能简单和优雅,但由于所有边缘情况,我不断提出越来越复杂的解决方案.我想我想要的是某种 FIFO,我可以在其中轻松地继续追加更多数据,同时从中弹出符合某些条件的数据(即换行符终止的字符串).

So, can anyone suggest a good method to handle this situation in a way that isn't overly complicated? I really want this to be as simple and elegant as possible, but I keep coming up with more and more complicated solutions due to all the edge cases. I guess what I want is some kind of FIFO in which i can easily keep appending more data while at the same time poping data out of it that matches certain criteria (ie, newline terminated strings).

推荐答案

这是一个相当有趣的问题.正如您所建议的那样,我过去的解决方案是使用具有同步操作的单独线程.(我设法解决了使用锁和大量异常处理程序阻塞套接字的大部分问题.)不过,通常建议使用内置的异步操作,因为它允许真正的操作系统级异步 I/O,所以我理解你的观点.

That's quite an interesting question. The solution for me in the past has been to use a separate thread with synchronous operations, as you propose. (I managed to get around most of the problems with blocking sockets using locks and lots of exception handlers.) Still, using the in-built asynchronous operations is typically advisable as it allows for true OS-level async I/O, so I understand your point.

好吧,我已经写了一个类来完成我认为你需要的东西(我想说的是一种相对干净的方式).让我知道你的想法.

Well I've gone and written a class for accomplishing what I believe you need (in a relatively clean manner I would say). Let me know what you think.

using System;
using System.Collections.Generic;
using System.IO;
using System.Text;

public class AsyncStreamProcessor : IDisposable
{
    protected StringBuilder _buffer;  // Buffer for unprocessed data.

    private bool _isDisposed = false; // True if object has been disposed

    public AsyncStreamProcessor()
    {
        _buffer = null;
    }

    public IEnumerable<string> Process(byte[] newData)
    {
        // Note: replace the following encoding method with whatever you are reading.
        // The trick here is to add an extra line break to the new data so that the algorithm recognises
        // a single line break at the end of the new data.
        using(var newDataReader = new StringReader(Encoding.ASCII.GetString(newData) + Environment.NewLine))
        {
            // Read all lines from new data, returning all but the last.
            // The last line is guaranteed to be incomplete (or possibly complete except for the line break,
            // which will be processed with the next packet of data).
            string line, prevLine = null;
            while ((line = newDataReader.ReadLine()) != null)
            {
                if (prevLine != null)
                {
                    yield return (_buffer == null ? string.Empty : _buffer.ToString()) + prevLine;
                    _buffer = null;
                }
                prevLine = line;
            }

            // Store last incomplete line in buffer.
            if (_buffer == null)
                // Note: the (* 2) gives you the prediction of the length of the incomplete line, 
                // so that the buffer does not have to be expanded in most/all situations. 
                // Change it to whatever seems appropiate.
                _buffer = new StringBuilder(prevLine, prevLine.Length * 2);
            else
                _buffer.Append(prevLine);
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    private void Dispose(bool disposing)
    {
        if (!_isDisposed)
        {
            if (disposing)
            {
                // Dispose managed resources.
                _buffer = null;
                GC.Collect();
            }

            // Dispose native resources.

            // Remember that object has been disposed.
            _isDisposed = true;
        }
    }
}

应该为每个 NetworkStream 创建一个此类的实例,并且每当收到新数据时都应该调用 Process 函数(在 BeginRead 的回调方法中,在你调用下一个 BeginRead 之前,我想).

An instance of this class should be created for each NetworkStream and the Process function should be called whenever new data is received (in the callback method for BeginRead, before you call the next BeginRead I would imagine).

注意:我仅使用测试数据验证了此代码,而不是通过网络传输的实际数据.但是,我预计不会有任何差异......

Note: I have only verified this code with test data, not actual data transmitted over the network. However, I wouldn't anticipate any differences...

此外,警告该类当然不是线程安全的,但只要在处理当前数据之前不再次执行 BeginRead(我假设您正在这样做),就不应该有任何问题.

Also, a warning that the class is of course not thread-safe, but as long as BeginRead isn't executed again until after the current data has been processed (as I presume you are doing), there shouldn't be any problems.

希望这对你有用.如果还有其他问题,请告诉我,我将尝试修改解决方案来处理它们.(尽管仔细阅读,我可能错过了一些微妙的问题!)

Hope this works for you. Let me know if there are remaining issues and I will try to modify the solution to deal with them. (There could well be some subtlety of the question I missed, despite reading it carefully!)

这篇关于处理基于线路的网络 I/O 流的好方法是什么?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆