在 Angular 和 rxjs 中重新连接 websocket? [英] Reconnecting a websocket in Angular and rxjs?

查看:50
本文介绍了在 Angular 和 rxjs 中重新连接 websocket?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个基于 ngrx/store (v2.2.2) 和 rxjs (v5.1.0) 的应用程序,它使用 observable 侦听 Web 套接字以获取传入数据.当我启动应用程序时,我可以完美地接收传入的数据.

但是过了一段时间(更新很少),连接似乎丢失了,我不再收到传入的数据.我的代码:

服务

import { Injectable, OnInit } from '@angular/core';从 'rxjs' 导入 { Observable };@Injectable()导出类 MemberService 实现 OnInit {私人网络套接字:任何;私人目的地:string = "wss://notessensei.mybluemix.net/ws/time";构造函数(){}ngOnInit() { }listenToTheSocket(): Observable{this.websocket = new WebSocket(this.destination);this.websocket.onopen = () =>{console.log("WebService 连接到" + this.destination);}返回 Observable.create(观察者 => {this.websocket.onmessage = (evt) =>{观察者.next(evt);};}).map(res => res.data).分享();}}

订阅者

 导出类 AppComponent 实现 OnInit {构造函数(/*私有存储:Store,*/private memberService:MemberService){}ngOnInit() {this.memberService.listenToTheSocket().subscribe((result: any) => {尝试 {控制台日志(结果);//const 成员:any = JSON.parse(result);//this.store.dispatch(new MemberActions.AddMember(member));}赶上(前){console.log(JSON.stringify(ex));}})}}

我需要做什么才能在超时时重新连接 Web 套接字,以便 observable 继续发出传入的值?

我看了一些问答这里这里here 它似乎没有解决这个问题(以我能理解的方式).>

注意:wss://notessensei.mybluemix.net/ws/time 上的 websocket 是实时的,并且每分钟发出一次时间戳(以防有人想测试).

非常感谢您的建议!

解决方案

实际上现在 rxjs 中有一个 WebsocketSubject!

 import { webSocket } from 'rxjs/webSocket'//对于 RxJS 6,对于 v5 使用 Observable.webSocketlet subject = webSocket('ws://localhost:8081');主题.订阅((味精) =>console.log('收到消息:' + msg),(错误) =>控制台日志(错误),() =>console.log('完成'));subject.next(JSON.stringify({ op: 'hello' }));

当您重新订阅断开的连接时,它会处理重新连接.所以例如写这个来重新连接:

subject.retry().subscribe(...)

查看文档了解更多信息.不幸的是,搜索框没有显示该方法,但您可以在这里找到它:

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

#-navigation 在我的浏览器中不起作用,因此在该页面上搜索webSocket".

来源:http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15

I have a ngrx/store (v2.2.2) and rxjs (v5.1.0) based application that listens to a web socket for incoming data using an observable. When I start the application I receive incoming data flawlessly.

However after a while (updates are coming in quite infrequently) the connection seem to get lost and I don't get anymore incoming data. My code:

The service

import { Injectable, OnInit } from '@angular/core';
import { Observable } from 'rxjs';

@Injectable()
export class MemberService implements OnInit {

  private websocket: any;
  private destination: string = "wss://notessensei.mybluemix.net/ws/time";

  constructor() { }

  ngOnInit() { }

  listenToTheSocket(): Observable<any> {

    this.websocket = new WebSocket(this.destination);

    this.websocket.onopen = () => {
      console.log("WebService Connected to " + this.destination);
    }

    return Observable.create(observer => {
      this.websocket.onmessage = (evt) => {
        observer.next(evt);
      };
    })
      .map(res => res.data)
      .share();
  }
}

The subscriber

  export class AppComponent implements OnInit {

  constructor(/*private store: Store<fromRoot.State>,*/ private memberService: MemberService) {}

  ngOnInit() {
    this.memberService.listenToTheSocket().subscribe((result: any) => {
      try {
        console.log(result);
        // const member: any = JSON.parse(result);
        // this.store.dispatch(new MemberActions.AddMember(member));
      } catch (ex) {
        console.log(JSON.stringify(ex));
      }
    })
  }
}

What do I need to do to reconnect the web socket when it times out, so the observable continues to emit incoming values?

I had a look at some Q&A here, here and here and it didn't seem to address this question (in a way I could comprehend).

Note: the websocket at wss://notessensei.mybluemix.net/ws/time is live and emits a time stamp once a minute (in case one wants to test that).

Advice is greatly appreciated!

解决方案

Actually there now is a WebsocketSubject in rxjs!

 import { webSocket } from 'rxjs/webSocket' // for RxJS 6, for v5 use Observable.webSocket

 let subject = webSocket('ws://localhost:8081');
 subject.subscribe(
    (msg) => console.log('message received: ' + msg),
    (err) => console.log(err),
    () => console.log('complete')
  );
 subject.next(JSON.stringify({ op: 'hello' }));

It does handle reconnection when you resubscribe to a broken connection. So for example write this to reconnect:

subject.retry().subscribe(...)

See the docs for more info. Unfortunately the searchbox doesn't show the method, but you find it here:

http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html#static-method-webSocket

that #-navigation is not working in my browser, so search for "webSocket" on that page.

Source: http://reactivex.io/rxjs/file/es6/observable/dom/WebSocketSubject.js.html#lineNumber15

这篇关于在 Angular 和 rxjs 中重新连接 websocket?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆