从(发送的服务器)EventSource创建可观察的RxJS [英] Creating an RxJS Observable from a (server sent) EventSource
问题描述
我想从EventSource(服务器发送的事件)创建一个RxJs Observable.
I would like to create a RxJs Observable from an EventSource (server sent events).
我尝试了以下操作:
import {Component, OnInit} from 'angular2/core';
import {Subject, Observable} from 'rxjs/Rx';
@Component({
selector: 'my-app',
template: `<h1>My second Angular 2 App</h1>
<ul>
<li *ngFor="#s of someStrings">
a string: {{ s }}
</li>
</ul>
`
})
export class AppComponent implements OnInit {
someStrings:string[] = [];
ngOnInit() {
let eventSource = new EventSource('/interval-sse-observable');
let observable = Observable.create(eventSource);
observable.subscribe({
next: aString => this.someStrings.push(aString.data),
error: err => console.error('something wrong occurred: ' + err)
});
}
}
但是我得到以下异常:
EXCEPTION: Error: Uncaught (in promise): EXCEPTION: TypeError: this._subscribe is not a function in [null]
ORIGINAL EXCEPTION: TypeError: this._subscribe is not a function
ORIGINAL STACKTRACE:
TypeError: this._subscribe is not a function
at Observable.subscribe (https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js:11210:29)
at AppComponent.ngOnInit (http://localhost:8080/scripts/app.component.ts!transpiled:30:28)
at AbstractChangeDetector.ChangeDetector_HostAppComponent_0.detectChangesInRecordsInternal (viewFactory_HostAppComponent:21:99)
at AbstractChangeDetector.detectChangesInRecords (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9689:14)
at AbstractChangeDetector.runDetectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9672:12)
at AbstractChangeDetector.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:9661:12)
at ChangeDetectorRef_.detectChanges (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:5280:16)
at https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13048:27
at Array.forEach (native)
at ApplicationRef_.tick (https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js:13047:34)
出于完整性考虑,这是我的index.html的内容:
For completeness' sake, here is the contents of my index.html:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>sse demo</title>
<!-- 1. Load libraries -->
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/angular2-polyfills.js"></script>
<script src="https://code.angularjs.org/tools/system.js"></script>
<script src="https://code.angularjs.org/tools/typescript.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/Rx.js"></script>
<script src="https://code.angularjs.org/2.0.0-beta.15/angular2.dev.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/angular.js/2.0.0-beta.15/http.dev.js"></script>
<!-- 2. Configure SystemJS -->
<script>
System.config({
transpiler: 'typescript',
typescriptOptions: { emitDecoratorMetadata: true }
});
System.import('./scripts/app.ts')
.then(null, console.error.bind(console));
</script>
</head>
<body>
<my-app>Loading...</my-app>
</body>
</html>
有人可以帮忙吗?
编辑1 :按照Yurzui的建议,我对代码进行了如下修改:
edit 1: Following Yurzui's advice, I modified my code as follows:
ngOnInit() {
let observable = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(console.log(x));
eventSource.onerror = x => observer.error(console.log('EventSource failed'));
return () => {
eventSource.close();
};
});
observable.subscribe({
next: aString => this.someStrings.push(aString.data),
error: err => console.error('something wrong occurred: ' + err)
});
}
它确实将控制台中的第一条消息记录如下:
It does log the first message in the console as follows:
MessageEvent {isTrusted: true, data: "c374a15b-b37d-498e-8ab0-49643b79c1bb", origin: "http://localhost:8080", lastEventId: "", source: null…}bubbles: falsecancelBubble: falsecancelable: falsecurrentTarget: EventSourcedata: "c374a15b-b37d-498e-8ab0-49643b79c1bb"defaultPrevented: falseeventPhase: 0isTrusted: trueisTrusted: truelastEventId: ""origin: "http://localhost:8080"path: Array[0]ports: nullreturnValue: truesource: nullsrcElement: EventSourcetarget: EventSourcetimeStamp: 6257.125type: "message"__proto__: MessageEvent
Rx.js:10982 Uncaught TypeError: Cannot read property 'data' of undefinedSystem.register.exports_1.execute.AppComponent.ngOnInit.observable.subscribe.next @ app.component.ts:29SafeSubscriber.__tryOrUnsub @ Rx.js:10979SafeSubscriber.next @ Rx.js:10934Subscriber._next @ Rx.js:10894Subscriber.next @ Rx.js:10871System.register.exports_1.execute.AppComponent.ngOnInit.Rx_1.Observable.create.eventSource.onmessage @ app.component.ts:21
现在,如果不是在控制台中记录x
变量,而是将其传递给下一个方法,如下所示:
Now if instead of logging the x
variable in the console I just pass it to the next method as follows:
eventSource.onmessage = x => observer.next(x);
服务器发送的事件由客户端检索(我在chrome dev工具中看到了它们),但是模板中没有任何显示,表明未填充字符串数组...
The server sent events are retrieved by the client (I see them in the chrome dev tools) but nothing is displayed in the template indicating the array of strings is not populated...
我必须删除JSON.parse(x.data)
,因为它会导致错误.
By the way I had to remove the JSON.parse(x.data)
as it was causing an error.
推荐答案
您可以使用以下代码为EventSource流手动创建Observable:
You could use the following code to manually create Observable for EventSource stream:
export class AppComponent implements OnInit {
someStrings:string[] = [];
constructor(private zone: NgZone) {}
ngOnInit(){
const observable = Observable.create(observer => {
const eventSource = new EventSource('/interval-sse-observable');
eventSource.onmessage = x => observer.next(x.data);
eventSource.onerror = x => observer.error(x);
return () => {
eventSource.close();
};
});
this.subscription = observable.subscribe({
next: guid => {
this.zone.run(() => this.someStrings.push(guid));
},
error: err => console.error('something wrong occurred: ' + err)
});
}
}
// somewhere
// this.subscription.unsubscribe()
别忘了导入NgZone类:
Don't forget to import the NgZone class:
import {Component, OnInit, NgZone} from '@angular/core';
这篇关于从(发送的服务器)EventSource创建可观察的RxJS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!