接收连接的 Websocket 史诗消息请求和发出消息 &连接状态更新 [英] Websocket epic that receives connection & message requests and emits messages & connection status updates

查看:51
本文介绍了接收连接的 Websocket 史诗消息请求和发出消息 &连接状态更新的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望创建一个 redux-observable 史诗,它可以与我的应用程序的其余部分分开.它需要:

I am hoping to create an redux-observable epic that can sit separate to the rest of my application. It needs to:

  • 侦听 { type: "SOCKET_TRY_CONNECT" } 的传入操作,这也可能会在连接时忽略任何其他 SOCKET_TRY_CONNECT 事件.另外监听要发送的消息,可能 { type: "SOCKET_MESSAGE_SEND", data }
  • 发出传出动作 { type: "SOCKET_CONNECTED" }, { type: "SOCKET_DISCONNECTED", error }{ type: "SOCKET_MESSAGE_RECEIVE", data }
  • Listen for incoming action of { type: "SOCKET_TRY_CONNECT" }, this should also probably disregard any additional SOCKET_TRY_CONNECT events while it is connected. Additionally listen for messages to be sent, maybe { type: "SOCKET_MESSAGE_SEND", data }
  • Emit outgoing actions { type: "SOCKET_CONNECTED" }, { type: "SOCKET_DISCONNECTED", error } and { type: "SOCKET_MESSAGE_RECEIVE", data }

epic 需要监听传入的套接字连接请求,建立套接字连接,然后在连接建立或丢失时输出状态更新.它还需要能够发送和接收消息,然后可以在其他地方进行处理.

The epic needs to listen for incoming socket connection requests, establish a socket connection and then output status updates when the connection is established or is lost. It also needs to be able to both send and receive messages which can then be processed elsewhere.

我对此最接近的是 这个问题:

The closest I have come to this, is the answer provided in this question:

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action =>
      Observable.webSocket('ws://localhost:8081')
        .map(response => ({ type: 'RECEIVED_MESSAGE', paylod: response }))
    );

但是我不确定如何扩展它以额外发出连接建立和断开连接事件,并额外接受要发送到服务器的消息.

However I am unsure how to extend this to additionally emit both connection established and disconnected events, and additionally accept messages to be sent to server.

推荐答案

一般来说,听起来你想要这样的东西:

Generally speaking it sounds like you want something like this:

(注意,这是未经测试的代码,但应该非常接近可运行)

(note, this is untested code but should be pretty close to runnable)

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action => {
      // Subjects are a combination of an Observer *and* an Observable
      // so webSocket can call openObserver$.next(event) and
      // anyone who is subscribing to openObserver$ will receive it
      // because Subjects are "hot"
      const openObserver$ = new Subject();
      const openObserver$ = new Subject();

      // Listen for our open/close events and transform them
      // to redux actions. We could also include values from
      // the events like event.reason, etc if we wanted
      const open$ = openObserver$.map((event) => ({
        type: 'SOCKET_CONNECTED'
      }));
      const close$ = openObserver$.map((event) => ({
        type: 'SOCKET_DISCONNECTED'
      }));

      // webSocket has an overload signature that accepts this object
      const options = {
        url: 'ws://localhost:8081',
        openObserver: openObserver$,
        closeObserver: openObserver$
      };
      const msg$ = Observable.webSocket(options)
        .map(response => ({ type: 'RECEIVED_MESSAGE', payload: response }))
        .catch(e => Observable.of({
          type: 'SOCKET_ERROR',
          payload: e.message
        }))

      // We're merging them all together because we want to listen for
      // and emit actions from all three. For good measure I also included
      // a generic .takeUntil() to demonstrate the most obvious way to stop
      // the websocket (as well as the open/close, which we shouldn't forget!)
      // Also notice how I'm listening for both the STOP_SOCKET_OR_WHATEVER
      // or also a SOCKET_ERROR because we want to stop subscribing
      // to open$/close$ if there is an error.  
      return Observable.merge(open$, close$, msg$)
        .takeUntil(action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR'));
    });

如果这部史诗需要一次支持多个套接字,您需要想出某种方法来唯一标识特定连接,并修改代码以基于此过滤信号.例如

If this epic needs to ever support multiple sockets at a time, you'll need to come up with some sort of way of uniquely identify a particular connection, and modify the code to filter signals based on that. e.g.

.takeUntil(
  action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR')
    .filter(action => action.someHowHaveId === someHowHaveId)
);

这篇关于接收连接的 Websocket 史诗消息请求和发出消息 &连接状态更新的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆