从NetworkStream异步读取具有大量数据包的数据 [英] Read asynchronously data from NetworkStream with huge amount of packets

查看:85
本文介绍了从NetworkStream异步读取具有大量数据包的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的应用程序中,每个数据包的开头都有2个字节的长度.但是,一段时间后,应用程序开始接收小于零的长度.在同步客户端中,一切正常,但速度太慢.我100%肯定在Server中一切正确.

In my application every packet has 2 bytes length on the start. However after some time application starts receiving length less than zero. In synchronous client everything works correctly, but it's too slow. I'm 100% sure in Server everything is correct.

连接:

    public void Connect(IPAddress ip, int port)
    {
        tcpClient.Connect(ip, port);
        stream = tcpClient.GetStream();
        byte[] len_buffer = new byte[2];
        stream.BeginRead(len_buffer, 0, len_buffer.Length, OnDataRead, len_buffer);
    }

OnDataRead:

OnDataRead:

    private void OnDataRead(IAsyncResult ar)
    {
            byte[] len = ar.AsyncState as byte[];
            int length = BitConverter.ToInt16(len, 0);
            byte[] buffer = new byte[length];

            int remaining = length;
            int pos = 0;
            while (remaining != 0)
            {
                int add = stream.Read(buffer, pos, remaining);
                pos += add;
                remaining -= add;
            }
            Process(buffer);
            len = new byte[2];

            stream.EndRead(ar);
            stream.BeginRead(len, 0, len.Length, OnDataRead, len);
    }

推荐答案

正如我所看到的,您正在混淆同步和异步.这是一个坏习惯.

As I can see, you're mixing up synchronious and asynchronious. That's a bad practice.

您想要的东西是这样的

var header = ReadHeader(); // 2 bytes
var data = ReadData(header.DataSize);

我没有使用网络流,但是....这是我的异步SocketReader的示例:

I didn't use the network stream, but.... Here's an example of my async SocketReader:

public static class SocketReader
{
    // This method will continues read until count bytes are read. (or socket is closed)
    private static void DoReadFromSocket(Socket socket, int bytesRead, int count, byte[] buffer, Action<ArraySegment<byte>> endRead)
    {
        // Start a BeginReceive.
        try
        {
            socket.BeginReceive(buffer, bytesRead, count - bytesRead, SocketFlags.None, (asyncResult) =>
            {
                // Get the bytes read.
                int read = 0;
                try
                {
                    // if this goes wrong, the read remains 0
                    read = socket.EndReceive(asyncResult);
                }
                catch (ObjectDisposedException) { }
                catch (Exception exception)
                {
                    Trace.TraceError(exception.Message);
                }


                // if zero bytes received, the socket isn't available anymore.
                if (read == 0)
                {
                    endRead(new ArraySegment<byte>(buffer, 0, 0));
                    return;
                }

                // increase the bytesRead, (position within the buffer)
                bytesRead += read;

                // if all bytes are read, call the endRead with the buffer.
                if (bytesRead == count)
                    // All bytes are read. Invoke callback.
                    endRead(new ArraySegment<byte>(buffer, 0, count));
                else
                    // if not all bytes received, start another BeginReceive.
                    DoReadFromSocket(socket, bytesRead, count, buffer, endRead);

            }, null);
        }
        catch (Exception exception)
        {
            Trace.TraceError(exception.Message);
            endRead(new ArraySegment<byte>(buffer, 0, 0));
        }
    }

    public static void ReadFromSocket(Socket socket, int count, Action<ArraySegment<byte>> endRead)
    {
        // read from socket, construct a new buffer.
        DoReadFromSocket(socket, 0, count, new byte[count], endRead);
    }

    public static void ReadFromSocket(Socket socket, int count, byte[] buffer, Action<ArraySegment<byte>> endRead)
    {
        // if you do have a buffer available, you can pass that one. (this way you do not construct new buffers for receiving and able to reuse buffers)

        // if the buffer is too small, raise an exception, the caller should check the count and size of the buffer.
        if (count > buffer.Length)
            throw new ArgumentOutOfRangeException(nameof(count));

        DoReadFromSocket(socket, 0, count, buffer, endRead);
    }
}

用法:

SocketReader.ReadFromSocket(socket, 2, (headerData) =>
{
    if(headerData.Count == 0)
    {
        // nothing/closed
        return;
    }

    // Read the length of the data.
    int length = BitConverter.ToInt16(headerData.Array, headerData.Offset);

    SocketReader.ReadFromSocket(socket, length, (dataBufferSegment) =>
    {
        if(dataBufferSegment.Count == 0)
        {
            // nothing/closed
            return;
        }

        Process(dataBufferSegment);

        // extra: if you need a binaryreader..
        using(var stream = new MemoryStream(dataBufferSegment.Array, dataBufferSegment.Offset, dataBufferSegment.Count))
        using(var reader = new BinaryReader(stream))
        {
            var whatever = reader.ReadInt32();
        }
    }
});

您可以通过传递缓冲区来优化接收缓冲区(请看重载)

You can optimize the receive buffer by passing a buffer (look at the overloads)

继续接收: (正在重新使用接收缓冲区)

public class PacketReader
{
    private byte[] _receiveBuffer = new byte[2];

    // This will run until the socket is closed.    
    public void StartReceiving(Socket socket, Action<ArraySegment<byte>> process)
    {
        SocketReader.ReadFromSocket(socket, 2, _receiveBuffer, (headerData) =>
        {
            if(headerData.Count == 0)
            {
                // nothing/closed
                return;
            }

            // Read the length of the data.
            int length = BitConverter.ToInt16(headerData.Array, headerData.Offset);

            // if the receive buffer is too small, reallocate it.
            if(_receiveBuffer.Length < length)
                _receiveBuffer = new byte[length];

            SocketReader.ReadFromSocket(socket, length, _receiveBuffer, (dataBufferSegment) =>
            {
                if(dataBufferSegment.Count == 0)
                {
                    // nothing/closed
                    return;
                }

                try
                {
                    process(dataBufferSegment);
                }
                catch { }

                StartReceiving(socket, process);
            });
        }); 
    }
}

用法:

private PacketReader _reader;

public void Start()
{
    _reader = new PacketReader(socket, HandlePacket);
}

private void HandlePacket(ArraySegment<byte> packet)
{
    // do stuff.....
}

这篇关于从NetworkStream异步读取具有大量数据包的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆