使用 WebSocket 可观察的 RxJs [英] RxJs Observable with WebSocket

查看:66
本文介绍了使用 WebSocket 可观察的 RxJs的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 Angular 应用程序使用 websocket 与后端通信.

My angular application uses a websocket to communicate with the backend.

在我的测试用例中,我有 2 个客户端组件.Observable 计时器按预期打印两个不同的客户端 ID.

In my test case I have 2 client components. The Observable timer prints two different client id's as expected.

每个 ngOnInit() 还会打印其客户端的 ID.

Each ngOnInit() also prints the id of its client.

现在由于某种原因,每条消息都会调用 websocketService.observeClient() 的订阅 2 次,但 this.client.id 总是打印第二个客户端的值.

NOW for some reason, the subscription of websocketService.observeClient() is called 2 times for each message but this.client.id always prints the value of the second client.

这是我的客户端组件

@Component({
...
})
export class ClientComponent implements OnInit {

  @Input() client: Client;

  constructor(public websocketService: WebsocketService) {
    Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id));
  }

  ngOnInit() {

    console.log(this.client.id);
    this.websocketService.observeClient().subscribe(data => {
      console.log('message', this.client.id);
    });

  }

}

还有我的 websocket 服务

And my websocket Service

@Injectable()
export class WebsocketService {

  private observable: Observable<MessageEvent>;
  private observer: Subject<Message>;

  constructor() {

    const socket = new WebSocket('ws://localhost:9091');

    this.observable = Observable.create(
      (observer: Observer<MessageEvent>) => {
        socket.onmessage = observer.next.bind(observer);
        socket.onerror = observer.error.bind(observer);
        socket.onclose = observer.complete.bind(observer);
        return socket.close.bind(socket);
      }
    );

    this.observer = Subject.create({
      next: (data: Message) => {
        if (socket.readyState === WebSocket.OPEN) {
          socket.send(JSON.stringify(data));
        }
      }
    });

  }

  observeClient(): Observable<MessageEvent> {
    return this.observable;
  }

}

<小时>

编辑

好的,据我所知,它与 Observables 是单播对象这一事实有关,我必须为此使用一个主题,但我不知道如何创建主题.

Ok as far as I have read it has to do with the fact that Observables are unicast objects and I have to use a Subject for that but I don't know how to create the Subject.

推荐答案

从 rxjs 5 开始,您可以使用内置的 websocket 功能为您创建主题.当您在发生错误后重新订阅流时,它也会重新连接.请参考这个答案:

As of rxjs 5 you can use the built-in websocket feature which creates the subject for you. It also reconnects when you resubscribe to the stream after an error. Please refer to this answer:

https://stackoverflow.com/a/44067972/552203

TLDR:

 let subject = Observable.webSocket('ws://localhost:8081');
 subject
   .retry()
   .subscribe(
      (msg) => console.log('message received: ' + msg),
      (err) => console.log(err),
      () => console.log('complete')
    );
 subject.next(JSON.stringify({ op: 'hello' }));

这篇关于使用 WebSocket 可观察的 RxJs的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆