Rxjs - 订阅相互依赖的 observables [英] Rxjs - Subscribing to interdependent observables
问题描述
我正在学习 angular 2 和 rxjs.我有 3 个变量,A B C.
I am learning angular 2 and rxjs. I have 3 variables, A B C.
- B 取决于 A 的值
- C 取决于 A 和 B 的值
我正在尝试设置可观察对象,以便:当 A 更新时,B 和 C 将自动更新.当 B 更新时,C 将自动更新.我尝试了两种设置,但都不令人满意.
I am trying to setup up the observables such that: When A is updated, B and C will be auto updated. When B is updated, C will be auto updated. I tried two setups but they are not satisfactory.
- 第一次设置:B 订阅了 observable A;C 订阅 observable B withlatestfrom A.A 中的变化确实向下级联到 B 然后到 C,但来自 A 的值不是最新的.
- 第二个设置:B 订阅 observable A;C 订阅了 A 和 B 的 combineLatest observable.此设置有效,但我收到了 C 的两个更新,首先来自 B,然后来自 A.
- First setup: B subscribes observable A; C subscribes observable B withlatestfrom A. The changes in A did cascade down to B then to C but the value from A is not the latest.
- Second setup: B subscribes observable A; C subscribes combineLatest observable of A and B. This setup works but I get two updates for C, first from B and then from A.
如何设置我的 observables/订阅,以便当 A 更新时,C 只会使用来自 A 和 B 的最新值更新一次?
How can I set up my observables / subscription such that when A is updated, C will only get updated once with latest value from A and B?
编辑 - 添加代码
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();
A_Observable.subscribe(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
B.next(newB);
});
// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable);
combineAllObservable.subscribe(function(data) {
console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
// SATISFACTORY RESULT
setTimeout(function(){
console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
推荐答案
代码的行为方式是这样的,因为默认情况下 RxJs 是同步运行代码的.withLatestFrom
案例的事件链:
Code behaves this way because by default RxJs runs code synchronously. Chain of events for withLatestFrom
case:
- 你推送新的
A
值:A.next(2);
- RxjS看到
A
有两种用法:
B
-订阅(先到先得)withLatestFrom
(排在第二位)
B
-subscription (comes first)withLatestFrom
(comes second)
B
-subscription
B
-subscription
- 推送
B
的新值 - RxjS看到有一个
B
的用法:C
-subscription - RxJs 调用
C
-subscription(旧值为A
!)
- It pushes new value of
B
- RxjS sees that there is a usage of
B
:C
-subscription - RxJs calls
C
-subscription (with old value ofA
!)
A
的新值被拉入 withLatestFrom
但C
-subscription 没有被触发(因为它只被 触发B
更新)
A
is pulled in withLatestFrom
but C
-subscription isn't triggered (because it is triggered only by B
updates)可能的解决方案是添加debounceTime(0)
.当withLatestFrom
中的值已经存在时,它会强制C
-subscription异步运行更新了.
Possible solution is to add debounceTime(0)
.It will force C
-subscription to run asynchronously when the value inside withLatestFrom
is already updated.
有关 RxJs 的 Scheduler
的更多信息,请谷歌.
For more info google for RxJs's Scheduler
.
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B = new Rx.BehaviorSubject(10);
var B_Observable = B.asObservable();
A_Observable.subscribe(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
B.next(newB);
});
// LATEST FROM OBSERVABLE
var latestFromObservable = B_Observable.debounceTime(0).withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// COMBINE ALL OBSERVABLE
var combineAllObservable = Rx.Observable.combineLatest(A_Observable,B_Observable).debounceTime(0);
combineAllObservable.subscribe(function(data) {
console.log("COMBINE LATEST : Value for A is " + data[0] + " ; Value for B is " + data[1]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
// SATISFACTORY RESULT
setTimeout(function(){
console.log("SATISFACTORY RESULT : Value for A is 2 ; Value for B is 20 --- CALLED ONLY ONCE WITH LATEST VALUES");
},2000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
我想补充两点:
withLatestFrom
方法在您的情况下更好,因为依赖关系图看起来像C ->B->A
并且当 A 更新时 B 也更新.如果A
和B
是独立的,那么combineLatest
将是更好的选择.B
不需要Subject
.你可以像这样重写代码(没有debounceTime
!)
withLatestFrom
approach is better in your case because dependency graph looks likeC -> B -> A
and when A is updated B is updated too. IfA
andB
were independent thencombineLatest
would be the better choice.- No need in
Subject
forB
. You can rewrite code like this (nodebounceTime
!)
var A = new Rx.BehaviorSubject(1);
var A_Observable = A.asObservable();
var B_Observable = A_Observable.map(function(A_value) {
var newB = A_value * 10;
// console.log("B auto updating to " + newB);
return newB;
});
var latestFromObservable = B_Observable.withLatestFrom(A_Observable);
latestFromObservable.subscribe(function(data) {
console.log("LATEST FROM : Value for A is " + data[1] + " ; Value for B is " + data[0]);
});
// UPDATE TO A
setTimeout(function(){
console.log("UPDATING A");
A.next(2);
},1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
这篇关于Rxjs - 订阅相互依赖的 observables的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!