Rxjs-只有第一个观察者才能看到来自observable.share()的数据 [英] Rxjs - only the first observer can see data from observable.share()
问题描述
我有一些如下的代码段
var videosNeedFix = Rx.Observable.fromArray(JSON.parse(fs.readFileSync("videoEntries.json"))).share();
videosNeedFix.count().subscribe(function(count){ //subscrption A
console.log(count + " in total");
});
videosNeedFix.subscribe(function(videoEntry){ //subscription B
console.log(videoEntry.id, videoEntry.name, videoEntry.customFields);
});
videoEntries.json是videoEntry对象的JSON序列化数组.我希望订阅A和订阅B都能接收到videoNeedFix可观察到的数据.
The videoEntries.json is a JSON-serialized array of videoEntry object. I'm expecting both subscription A and subscription B to receive the data emitted by videosNeedFix observable.
但是,根据控制台日志,只有订阅A将接收数据,而没有订阅B.如果我交换进行两个订阅的顺序,则只有subscriptionB会看到数据.可观察者为什么只向第一个订阅发出数据?
However, according to the console log, only the subscription A will receive data but not the subscriptionB. If I swap the order of making the two subscriptions, only subscriptionB will see the data. How come the observable only emits data to the first subscription?
推荐答案
This is a good use case (and maybe the only - see To Use Subject Or Not To Use Subject?) for a Rx.Subject
请考虑以下示例.这段代码(注释中提到的.delay()hack)可以正常工作,但对我来说似乎有点hacky:
Consider the following example. This code (with .delay() hack mentioned in the comments) will work, but seems a bit hacky to me:
let stream$ = Rx.Observable
.return(updatesObj)
.map(obj => Object.assign({}, obj.localData, obj.updates))
.delay(1) //Hacky way of making it work
.share()
stream$
.flatMap(obj => Observable.fromPromise(AsyncStorage.setItem('items', JSON.stringify(obj))))
.catch(Observable.return(false))
.subscribe()
stream$
.subscribe(obj => dispatch(dataIsReady(obj)))
带有Rx.Subjects的示例:
Example with Rx.Subjects:
let subjS = new Rx.Subject()
let stream$ = subjS
.map(obj => Object.assign({}, obj.localData, obj.updates))
.share()
stream$
.flatMap(obj => Observable.fromPromise(AsyncStorage.setItem('items', JSON.stringify(obj))))
.catch(Observable.return(false))
.subscribe()
stream$
.subscribe(obj => dispatch(dataIsReady(obj)))
subjS.onNext(updatesObj)
这篇关于Rxjs-只有第一个观察者才能看到来自observable.share()的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!