WebSocket可观察到RxJs [英] RxJs Observable with WebSocket
问题描述
我的angular应用程序使用websocket与后端进行通信.
My angular application uses a websocket to communicate with the backend.
在我的测试案例中,我有2个客户端组件.可观察计时器会按预期打印两个不同的客户端ID.
In my test case I have 2 client components. The Observable timer prints two different client id's as expected.
每个ngOnInit()也会打印其客户端的ID.
Each ngOnInit() also prints the id of its client.
现在由于某种原因,每条消息都会调用websocketService.observeClient()的预订两次,但this.client.id
始终会打印第二个客户端的值.
NOW for some reason, the subscription of websocketService.observeClient() is called 2 times for each message but this.client.id
always prints the value of the second client.
这里有我的客户组件
@Component({
...
})
export class ClientComponent implements OnInit {
@Input() client: Client;
constructor(public websocketService: WebsocketService) {
Observable.timer(1000, 1000).subscribe(() => console.log(this.client.id));
}
ngOnInit() {
console.log(this.client.id);
this.websocketService.observeClient().subscribe(data => {
console.log('message', this.client.id);
});
}
}
还有我的websocket服务
And my websocket Service
@Injectable()
export class WebsocketService {
private observable: Observable<MessageEvent>;
private observer: Subject<Message>;
constructor() {
const socket = new WebSocket('ws://localhost:9091');
this.observable = Observable.create(
(observer: Observer<MessageEvent>) => {
socket.onmessage = observer.next.bind(observer);
socket.onerror = observer.error.bind(observer);
socket.onclose = observer.complete.bind(observer);
return socket.close.bind(socket);
}
);
this.observer = Subject.create({
next: (data: Message) => {
if (socket.readyState === WebSocket.OPEN) {
socket.send(JSON.stringify(data));
}
}
});
}
observeClient(): Observable<MessageEvent> {
return this.observable;
}
}
编辑
Edit
据我所知,它与Observables是单播对象有关,我必须为此使用Subject,但是我不知道如何创建Subject.
Ok as far as I have read it has to do with the fact that Observables are unicast objects and I have to use a Subject for that but I don't know how to create the Subject.
推荐答案
从rxjs 5开始,您可以使用内置的websocket功能来为您创建主题.发生错误后重新订阅流时,它也会重新连接.请参考以下答案:
As of rxjs 5 you can use the built-in websocket feature which creates the subject for you. It also reconnects when you resubscribe to the stream after an error. Please refer to this answer:
https://stackoverflow.com/a/44067972/552203
TLDR:
let subject = Observable.webSocket('ws://localhost:8081');
subject
.retry()
.subscribe(
(msg) => console.log('message received: ' + msg),
(err) => console.log(err),
() => console.log('complete')
);
subject.next(JSON.stringify({ op: 'hello' }));
这篇关于WebSocket可观察到RxJs的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!