阅读使用TcpClient的和无扩展从流连续字节流 [英] Read continous bytestream from Stream using TcpClient and Reactive Extensions

查看:630
本文介绍了阅读使用TcpClient的和无扩展从流连续字节流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

考虑下面的代码:

internal class Program
{
    private static void Main(string[] args)
    {
        var client = new TcpClient();
        client.ConnectAsync("localhost", 7105).Wait();
        var stream = client.GetStream();
        var observable = stream.ReadDataObservable().Repeat();

        var s = from d in observable.Buffer(4)
                let headerLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt16(d.ToArray(), 2))
                let b = observable.Take(headerLength)
                select b.ToEnumerable().ToArray();
        s.Subscribe(a => Console.WriteLine("{0}", a));
        Console.ReadLine();
    }
}

public static class Extensions
{
    public static IObservable<byte> ReadDataObservable(this Stream stream)
    {
        return Observable.Defer(async () =>
        {
            var buffer = new byte[1024];
            var readBytes = await stream.ReadAsync(buffer, 0, buffer.Length);
            return buffer.Take(readBytes).ToObservable();
        });
    }
}



基本上,我想分析我收到无消息扩展。消息的标题被解析正确使用缓冲区(4),我得到消息的剩余部分的长度。所产生的问题是,当我做stream.Take(headerLength),代码重新评估整个产业链,并试图获得来自流中的新邮件,而不是返回其已被从流中读取字节的其余部分。更确切地说,第一ReadAsync(...)返回38字节,缓冲液(4)返回的那些的第一4,observable.Take(headerLength)不返回耳提面命34字节,而是试图读取一个新消息,ReadAsync。

Basically I want to parse the messages I receive with Reactive Extensions. The header of the message is parsed correctly using the Buffer(4) and I get the length of the remainder of the message. The problem that arises is that when I do stream.Take(headerLength), the code reevaluates the whole "chain" and tries to get a new message from the stream instead of returning the rest of the bytes which already has been read from the stream. To be more exact, the first ReadAsync(...) returns 38 bytes, the Buffer(4) returns the first 4 of those, the observable.Take(headerLength) does not return the remainding 34 bytes but instead tries to read a new message with ReadAsync.

现在的问题是,我如何能确保observable.Take(headerLength)接收已经看过34字节,而不是试图读取新邮件流?我已经搜索绕了解决方案,但我真的不能弄清楚如何做到这一点。

The question is, how can I make sure the observable.Take(headerLength) receives the already read 34 bytes and not try to read a new message from the stream? I've searched around for a solution, but I can't really figure out how to achieve this.

编辑:此解决方案(的Using反应扩展(Rx)在socket编程实用?)是不是我M寻找。这是不读书的一切可用流中(最多缓冲区大小),使一个连续的字节流出来。对我来说,这个解决方案似乎并不像一个非常有效的方式从流中读取,因此,我的问题。

This solution (Using Reactive Extensions (Rx) for socket programming practical?) is not what I'm looking for. This isn't reading everything available in the stream (up to buffersize) and makes a continous bytestream out of it. To me this solution doesn't seem like a very efficient way to read from a stream, hence my question.

推荐答案

此方法是行不通的。问题是你使用的是可观察的方式。 缓存将无法读取4个字节并退出,它会不断地读取4字节块。在形成第二订阅,将读取的字节重叠。你会发现它更容易直接解析流进的消息。

This approach isn't going to work. The problem is the way you are using the observable. Buffer will not read 4 bytes and quit, it will continually read 4 byte chunks. The Take forms a second subscription that will read overlapping bytes. You'll find it much easier to parse the stream directly into messages.

下面的代码使力气收拾妥当,以及一个很好的协议。

The following code makes a good deal of effort to clean up properly as well.

假设你的消息只是这一点,(的ToString 添加用于测试)

Assuming your Message is just this, (ToString added for testing):

public class Message
{
    public byte[] PayLoad;

    public override string ToString()
    {
        return Encoding.UTF8.GetString(PayLoad);
    }
}

和你已经获得了则可以按如下解析它。首先,一种方法从流中读取的字节一个确切的数字:

And you have acquired a Stream then you can parse it as follows. First, a method to read an exact number of bytes from a stream:

public async static Task ReadExactBytesAsync(
    Stream stream, byte[] buffer, CancellationToken ct)
{
    var count = buffer.Length;
    var totalBytesRemaining = count;
    var totalBytesRead = 0;
    while (totalBytesRemaining != 0)
    {
        var bytesRead = await stream.ReadAsync(
            buffer, totalBytesRead, totalBytesRemaining, ct);
        ct.ThrowIfCancellationRequested();
        totalBytesRead += bytesRead;
        totalBytesRemaining -= bytesRead;
    }
}



然后,流的转换的IObservable<消息>

public static IObservable<Message> ReadMessages(
    Stream sourceStream,
    IScheduler scheduler = null)
{
    int subscribed = 0;
    scheduler = scheduler ?? Scheduler.Default;

    return Observable.Create<Message>(o =>
    {
        // first check there is only one subscriber
        // (multiple stream readers would cause havoc)
        int previous = Interlocked.CompareExchange(ref subscribed, 1, 0);

        if (previous != 0)
            o.OnError(new Exception(
                "Only one subscriber is allowed for each stream."));

        // we will return a disposable that cleans
        // up both the scheduled task below and
        // the source stream
        var dispose = new CompositeDisposable
        {
            Disposable.Create(sourceStream.Dispose)
        };

        // use async scheduling to get nice imperative code
        var schedule = scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            // store the header here each time
            var header = new byte[4];

            // loop until cancellation requested
            while (!ct.IsCancellationRequested)
            {                        
                try
                {
                    // read the exact number of bytes for a header
                    await ReadExactBytesAsync(sourceStream, header, ct);
                }
                catch (OperationCanceledException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    // pass through any problem in the stream and quit
                    o.OnError(new InvalidDataException("Error in stream.", ex));
                    return;
                }                   
                ct.ThrowIfCancellationRequested();

                var bodyLength = IPAddress.NetworkToHostOrder(
                    BitConverter.ToInt16(header, 2));
                // create buffer to read the message
                var payload = new byte[bodyLength];

                // read exact bytes as before
                try
                {
                    await ReadExactBytesAsync(sourceStream, payload, ct);
                }
                catch (OperationCanceledException)
                {
                    throw;
                }
                catch (Exception ex)
                {
                    o.OnError(new InvalidDataException("Error in stream.", ex));
                    return;
                }

                // create a new message and send it to client
                var message = new Message { PayLoad = payload };
                o.OnNext(message);
            }
            // wrap things up
            ct.ThrowIfCancellationRequested();
            o.OnCompleted();
        });

        // return the suscription handle
        dispose.Add(schedule);
        return dispose;
    });
}



编辑 - 非常哈克测试代码我用:

EDIT - Very hacky test code I used:

private static void Main(string[] args)
{
    var listener = new TcpListener(IPAddress.Any, 12873);
    listener.Start();

    var listenTask = listener.AcceptTcpClientAsync();
    listenTask.ContinueWith((Task<TcpClient> t) =>
    {
        var client = t.Result;
        var stream = client.GetStream();
        const string messageText = "Hello World!";                
        var body = Encoding.UTF8.GetBytes(messageText);                
        var header = BitConverter.GetBytes(
            IPAddress.HostToNetworkOrder(body.Length));
        for (int i = 0; i < 5; i++)
        {
            stream.Write(header, 0, 4);
            stream.Write(body, 0, 4);
            stream.Flush();
            // deliberate nasty delay
            Thread.Sleep(2000);
            stream.Write(body, 4, body.Length - 4);
            stream.Flush();
        }
        stream.Close();
        listener.Stop();
    });


    var tcpClient = new TcpClient();
    tcpClient.Connect(new IPEndPoint(IPAddress.Loopback, 12873));
    var clientStream = tcpClient.GetStream();

    ReadMessages(clientStream).Subscribe(
        Console.WriteLine,
        ex => Console.WriteLine("Error: " + ex.Message),
        () => Console.WriteLine("Done!"));

    Console.ReadLine();
}



结束语



你需要考虑设置超时读取,万一服务器死机,和某种最终消息应该由服务器发送。目前,这个方法会不断地只是试图接收字节。由于您没有specced它,我没有包含这样的事 - 但如果这样做,那么作为我写它只是 ING出来while循环会引起 OnCompleted 发送。

Wrapping up

You need to think about setting a timeout for reads, in case the server dies, and some kind of "end message" should be sent by the server. Currently this method will just continually tries to receive bytes. As you haven't specced it, I haven't included anything like this - but if you do, then as I've written it just breaking out of the while loop will cause OnCompleted to be sent.

这篇关于阅读使用TcpClient的和无扩展从流连续字节流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆