使用Reactive Extensions(Rx)进行套接字编程实用吗? [英] Using Reactive Extensions (Rx) for socket programming practical?

查看:84
本文介绍了使用Reactive Extensions(Rx)进行套接字编程实用吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

用Rx编写 GetMessages 函数的最简洁的方式是:

What is the most succint way of writing the GetMessages function with Rx:

static void Main()
{
    Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    var messages = GetMessages(socket, IPAddress.Loopback, 4000);
    messages.Subscribe(x => Console.WriteLine(x));

    Console.ReadKey();
}

static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
    var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);

    // now will receive a stream of messages
    // each message is prefixed with an 4 bytes/Int32 indicating it's length. 
    // the rest of the message is a string

    // ????????????? Now What ????????????? 
}

作为上述示例的驱动程序的简单服务器: http://gist.github.com/452893#file_program.cs

A simple server as a driver for the above sample: http://gist.github.com/452893#file_program.cs

我一直在使用反应性扩展,用于我正在做的一些套接字编程工作。我这样做的动机是,它将以某种方式使代码更简单。这是否意味着更少的代码,更少的嵌套内容。

I've been investigating using Reactive Extensions for some socket programming work I am doing. My motivation for doing so would be that it would somehow make the code "simpler". Whether this would mean less code, less nesting something along those lines.

但是到目前为止,似乎并非如此:

However so far that does not seem to be the case:


  1. I尚未找到使用Rx和套接字的很多示例

  2. 示例 s 我发现与我现有的BeginXXXX,EndXXXX代码一样,

  3. 尽管 Observable 具有 FromAsyncPattern的扩展方法,这不包括 SocketEventArgs 异步API。

  1. I haven't found very many examples of using Rx with sockets
  2. The examples I have found don't seem less complicated then my existing BeginXXXX, EndXXXX code
  3. Although Observable has extension methods for FromAsyncPattern, this does not cover the SocketEventArgs Async API.



当前的非工作解决方案



这是我到目前为止的内容。这是行不通的,它会因堆栈溢出而失败(heh)我还没有弄清楚语义,因此我可以创建一个 IObservable 来读取指定数量的

    static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
    {
        var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);

        // keep reading until we get the first 4 bytes
        byte[] buffer = new byte[1024];
        var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

        IObservable<int> readBytes = null;
        var temp = from totalRead in Observable.Defer(() => readBytes)
                   where totalRead < 4
                   select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
        readBytes = temp.SelectMany(x => x).Sum();

        var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
    }


推荐答案

按以下方式工作。这未经测试,没有考虑异常和部分返回消息的情况。但是,否则,我认为这是正确的方向。

Something along these lines could work. This is not tested, does not take into account exceptions and the case when a message is returned partially. But otherwise, I believe this is a right direction to go.

    public static IObservable<T> GetSocketData<T>(this Socket socket,
        int sizeToRead, Func<byte[], T> valueExtractor)
    {
        return Observable.CreateWithDisposable<T>(observer =>
        {
            var readSize = Observable
                .FromAsyncPattern<byte[], int, int, SocketFlags, int>(
                socket.BeginReceive,
                socket.EndReceive);
            var buffer = new byte[sizeToRead];
            return readSize(buffer, 0, sizeToRead, SocketFlags.None)
                .Subscribe(
                x => observer.OnNext(valueExtractor(buffer)),
                    observer.OnError,
                    observer.OnCompleted);
        });
    }

    public static IObservable<int> GetMessageSize(this Socket socket)
    {
        return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
    }

    public static IObservable<string> GetMessageBody(this Socket socket,
        int messageSize)
    {
        return socket.GetSocketData(messageSize, buf =>
            Encoding.UTF8.GetString(buf, 0, messageSize));
    }

    public static IObservable<string> GetMessage(this Socket socket)
    {

        return
            from size in socket.GetMessageSize()
            from message in Observable.If(() => size != 0,
                socket.GetMessageBody(size),
                Observable.Return<string>(null))
            select message;
    }

    public static IObservable<string> GetMessagesFromConnected(
        this Socket socket)
    {
        return socket
            .GetMessage()
            .Repeat()
            .TakeWhile(msg => !string.IsNullOrEmpty(msg));
    }

    public static IObservable<string> GetMessages(this Socket socket,
        IPAddress addr, int port)
    {
        return Observable.Defer(() => 
        {
            var whenConnect = Observable
                .FromAsyncPattern<IPAddress, int>(
                    socket.BeginConnect, socket.EndConnect);
            return from _ in whenConnect(addr, port)
                   from msg in socket.GetMessagesFromConnected()
                       .Finally(socket.Close)
                   select msg;
        });
    }

编辑:要处理不完整的读取,可以使用Observable。可以使用(在GetSockedData中) ),如Dave Sexton在在RX论坛上使用相同的线程

To handle incomplete reads, Observable.While can be used (within GetSockedData) as proposed by Dave Sexton in the same thread on RX forum.

编辑:另外,请看一下Jeffrey Van Gogh的文章:异步System.IO.Stream读取

Also, take a look at this Jeffrey Van Gogh's article: Asynchronous System.IO.Stream reading

这篇关于使用Reactive Extensions(Rx)进行套接字编程实用吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆