RXJS:动态创建的可观察对象中的单个可观察对象 [英] RXJS: Single Observable from dynamically created Observables

查看:159
本文介绍了RXJS:动态创建的可观察对象中的单个可观察对象的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个可观察的对象(此处为fixedObservable),随着时间的推移,它将由多个动态创建的可观察对象提供.无论何时进行订阅,订阅者都应从fixedObservable获取最新值.虽然我已经实现了它,但是我想知道是否有任何简单的解决方案.您可以在 https://jsfiddle.net/dzm2t3pa/8/中找到代码.

谢谢

 var resultdiv = document.getElementById('result');

function print(message) {
  var paraEl = document.createElement('p');
  paraEl.innerHTML = message;
  resultdiv.appendChild(paraEl);
}
var startTime = new Date();

var fixedObservable = new Rx.Subject();
var subscription;

function subscribe() {
  subscription = fixedObservable.subscribe(function(value) {
    print('subscriber 1:' + value + ' :after ' + (new Date() - startTime) / 1000 + ' secs');
  });
}

function unsubscribe() {
  subscription.unsubscribe();
}
// Subscribe to the fixedObservable(i.e Subject) initiallly
subscribe()

//Unsubscribe and resubscribe after 11 sec here to see what
// happens in that period.
//Observation: The values generated in that period are not received as needed.
setTimeout(unsubscribe, 11000);
setTimeout(subscribe, 22000);

//just a simple check with multiple subscribers later.All the
//subscribers get the latest value. 
/*setTimeout(function(){
  fixedObservable.subscribe(function(value){
   	print('subscriber 2:'+value+' :after '+(new Date()-startTime)/1000+' secs');
   });
},30000);*/
//setTimeout(function(){},)

//Initial Observables which feed the data to fixedObservable
var dynamicObservable1 = Rx.Observable.interval(4000);
var dynamicObservable2 = Rx.Observable.interval(5000);
var counter1 = 0;
//subscribe to dynamic observable and pass the data to fixedObservable
dynamicObservable1.subscribe(function(value) {
  counter1 += 1;
  fixedObservable.next(counter1 + 'th value from 1');
});
var counter2 = 0;
dynamicObservable2.subscribe(function(value) {
  counter2 += 1;
  fixedObservable.next(counter2 + 'th value from 2');
});

//create a dynamicObservable after 5secs to feed the data and 
// then destroy it after 2.5 sec i.e at 7.5 sec from start.
//Observation: 2 value from this stream is received and then stops
setTimeout(function() {
  var counter3 = 0;
  var dynamicObservable3 = Rx.Observable.interval(1000);
  var subs = dynamicObservable3.subscribe(function(value) {
    counter3 += 1;
    fixedObservable.next(counter3 + 'th value from 3');
  });
  setTimeout(function() {
    subs.unsubscribe()
  }, 2500)
}, 5000);

setTimeout(function() {
  unsubscribe();
}, 50000) 

 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
<div id="result">

</div> 

解决方案

使用合并,我猜不需要主题. 这样适合您吗?

const dyn1 = Rx.Observable.interval(300)
                      .timeInterval()
                      .map(interval => {return {value:interval.value, origin:'dyn1'}});
const dyn2 = Rx.Observable.interval(601)
                          .timeInterval()
                          .map(interval => {return {value:interval.value, origin:'dyn2'}});

const m = Rx.Observable.merge(dyn1, dyn2);
m.subscribe(v => console.log(v.value + " from " + v.origin));

I am trying to create an observable (fixedObservable here) which will be fed by multiple dynamically created observable over time. Whenever a subscription is made than the subscriber should get the most recent value from the fixedObservable. Although I have achieved it but I wanted to know if there is any simple solution to it. You can find the code here https://jsfiddle.net/dzm2t3pa/8/ .

Thanks

var resultdiv = document.getElementById('result');

function print(message) {
  var paraEl = document.createElement('p');
  paraEl.innerHTML = message;
  resultdiv.appendChild(paraEl);
}
var startTime = new Date();

var fixedObservable = new Rx.Subject();
var subscription;

function subscribe() {
  subscription = fixedObservable.subscribe(function(value) {
    print('subscriber 1:' + value + ' :after ' + (new Date() - startTime) / 1000 + ' secs');
  });
}

function unsubscribe() {
  subscription.unsubscribe();
}
// Subscribe to the fixedObservable(i.e Subject) initiallly
subscribe()

//Unsubscribe and resubscribe after 11 sec here to see what
// happens in that period.
//Observation: The values generated in that period are not received as needed.
setTimeout(unsubscribe, 11000);
setTimeout(subscribe, 22000);

//just a simple check with multiple subscribers later.All the
//subscribers get the latest value. 
/*setTimeout(function(){
  fixedObservable.subscribe(function(value){
   	print('subscriber 2:'+value+' :after '+(new Date()-startTime)/1000+' secs');
   });
},30000);*/
//setTimeout(function(){},)

//Initial Observables which feed the data to fixedObservable
var dynamicObservable1 = Rx.Observable.interval(4000);
var dynamicObservable2 = Rx.Observable.interval(5000);
var counter1 = 0;
//subscribe to dynamic observable and pass the data to fixedObservable
dynamicObservable1.subscribe(function(value) {
  counter1 += 1;
  fixedObservable.next(counter1 + 'th value from 1');
});
var counter2 = 0;
dynamicObservable2.subscribe(function(value) {
  counter2 += 1;
  fixedObservable.next(counter2 + 'th value from 2');
});

//create a dynamicObservable after 5secs to feed the data and 
// then destroy it after 2.5 sec i.e at 7.5 sec from start.
//Observation: 2 value from this stream is received and then stops
setTimeout(function() {
  var counter3 = 0;
  var dynamicObservable3 = Rx.Observable.interval(1000);
  var subs = dynamicObservable3.subscribe(function(value) {
    counter3 += 1;
    fixedObservable.next(counter3 + 'th value from 3');
  });
  setTimeout(function() {
    subs.unsubscribe()
  }, 2500)
}, 5000);

setTimeout(function() {
  unsubscribe();
}, 50000)

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script>
<div id="result">

</div>

解决方案

using merge, and no need for a subject i guess. Is something like that ok for you?

const dyn1 = Rx.Observable.interval(300)
                      .timeInterval()
                      .map(interval => {return {value:interval.value, origin:'dyn1'}});
const dyn2 = Rx.Observable.interval(601)
                          .timeInterval()
                          .map(interval => {return {value:interval.value, origin:'dyn2'}});

const m = Rx.Observable.merge(dyn1, dyn2);
m.subscribe(v => console.log(v.value + " from " + v.origin));

这篇关于RXJS:动态创建的可观察对象中的单个可观察对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆