使用Reactive Extensions观察传入的Websocket消息吗? [英] Observing incoming websocket messages with Reactive Extensions?

查看:77
本文介绍了使用Reactive Extensions观察传入的Websocket消息吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用linq处理通过websocket连接接收的事件.这是我到目前为止的内容:

I want to use linq to process events received via a websocket connection. This is what I have so far:

    private static void Main()
    {
        string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
        using (WebSocket ws = new WebSocket(WsEndpoint))
        {
            ws.OnMessage += Ws_OnMessage;

            ws.Connect();
            Console.ReadKey();
            ws.Close();
        }
    }

    private static void Ws_OnMessage(object sender, MessageEventArgs e)
    {
        Console.WriteLine(e.Data);
    }

首先困扰我的是如何将 ws.OnMessage 转换为某种事件流.我无法在网上找到任何示例来观察具有反应性扩展的 external 事件源.我打算将消息解析为json对象,然后对其进行过滤和汇总.

The first think that stumps me is how to turn ws.OnMessage into some sort of event stream. I cannot find any examples online for observing an external event source with reactive extensions. I intend to parse the messages into json objects, then filter and aggregate them.

有人可以提供一个示例,从Websocket消息中创建一个可观察对象并进行订阅吗?

Could someone provide an example of creating an observable from the websocket messages, and subscribing to it?

与选择的答案唯一的不同是,我在将Websocket传递给 Observable.Using

The only difference from the chosen answer is that I initialized the websocket before passing it into Observable.Using

//-------------------------------------------------------
// Create websocket connection
//-------------------------------------------------------
const string wsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";
WebSocket socket = new WebSocket(wsEndpoint);


//-------------------------------------------------------
// Create an observable by wrapping ws.OnMessage
//-------------------------------------------------------
var globalEventStream = Observable
    .Using(
        () => socket,
        ws =>
            Observable
                .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
                    handler => ws.OnMessage += handler,
                    handler => ws.OnMessage -= handler));
//---------------------------------------------------------
// Subscribe to globalEventStream
//---------------------------------------------------------

IDisposable subscription = globalEventStream.Subscribe(ep =>
{
    Console.WriteLine("Event Recieved");
    Console.WriteLine(ep.EventArgs.Data);
});

//----------------------------------------------------------
// Send message over websocket
//----------------------------------------------------------
socket.Connect();
socket.Send("test message");
// When finished, close the connection.
socket.Close();

推荐答案

您应该这样设置可观察对象:

You should set up your observable like this:

    var observable =
        Observable
            .Using(
                () => new WebSocket(WsEndpoint),
                ws =>
                    Observable
                        .FromEventPattern<EventHandler<MessageEventArgs>, MessageEventArgs>(
                            handler => ws.OnMessage += handler,
                            handler => ws.OnMessage -= handler));

这将正确创建套接字,然后在观察到的订阅时观察事件.取消订阅后,它将正确脱离事件并丢弃套接字.

This will correctly create the socket and then observe the event when the observable is subscribed to. When the subscription is disposed it will correctly detach from the event and dispose of the socket.

observable 的类型将为 IObservable< EventPattern< MessageEventArgs>> .您可以通过以下方式消耗此可观察值:

The type of observable will be IObservable<EventPattern<MessageEventArgs>>. You consume this observable in this way:

IDisposable subscription = observable.Subscribe(ep =>
{
    Console.WriteLine(ep.EventArgs.Data);
});


感谢您发布NuGet参考.


Thanks for posted the NuGet reference.

这是工作代码:

const string WsEndpoint = "wss://push.planetside2.com/streaming?environment=ps2&service-id=s:quicktesting";

Console.WriteLine("Defining Observable:");

IObservable<EventPattern<WebSocketSharp.MessageEventArgs>> observable =
    Observable
        .Using(
            () =>
            {
                var ws = new WebSocketSharp.WebSocket(WsEndpoint);
                ws.Connect();
                return ws;
            },
            ws =>
                Observable
                    .FromEventPattern<EventHandler<WebSocketSharp.MessageEventArgs>, WebSocketSharp.MessageEventArgs>(
                        handler => ws.OnMessage += handler,
                        handler => ws.OnMessage -= handler));

Console.WriteLine("Subscribing to Observable:");

IDisposable subscription = observable.Subscribe(ep =>
{
    Console.WriteLine("Event Recieved");
    Console.WriteLine(ep.EventArgs.Data);
});

Console.WriteLine("Writing to Source:");

using (var source = new WebSocketSharp.WebSocket(WsEndpoint))
{
    source.Connect();
    source.Send("test");
}

这篇关于使用Reactive Extensions观察传入的Websocket消息吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆