将未知数量的可观察对象与 RxJS 合并 [英] Merging unknown number of observables with RxJS

查看:63
本文介绍了将未知数量的可观察对象与 RxJS 合并的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试创建一个小项目来演示使用 RxJS 进行响应式编程.目标是向我的同事展示这个东西就在那里,值得研究.我对框架没有经验,所以这让事情变得复杂.

I am currently trying to create a small project to demo Reactive Programming with RxJS. The goal is to show my colleagues that this thing is out there, and it is worth looking into. I'm not experienced with the framework, so that makes things complicated.

我正在尝试扩展我的另一个演示以使用 RxJS.不是一个很复杂的demo,基本上我可以添加任意数量的小表格,得到一个小公式计算出来的数字,还有一个按钮,可以汇总所有表格的值.

I am trying to expand another demo of mine to utilize RxJS. It is not a very complicated demo, basically I can add any number of small forms, which result in a number calculated by a small formula, and there is a button, which sums up the values of all the forms.

在表格中计算公式很容易,但我想我可以走得更远.

Getting the formula calculate inside the forms is easy, but I figured I could go further.

我希望通过合并的 observable 自动完成求和运算.我想出的唯一解决方案是:

I want to have the sum operation done automatically through a merged observable. The only solution I figured out is something along these lines:

//dummy observables
var s1 = Rx.Observable.Interval(100);
var s2 = Rx.Observable.Interval(200);

//Combine the observables
var m = s1.combineLatest(s2, function(x,y){return x+y});

//Subscribe to the combined observable
var sub = m.subscribe(function(x){console.log(x)});
//A new observable is created
var s3 = Rx.Observable.Interval(300);
//Now I need to update all my subscriptions, wich is a pain.
m = m.combine(s3, function(x,y){return x+y});
sub.dispose();
sub=m.subscribe(function(x){console.log(x)});

我想我可以得到另一个 observable 来通知我的订阅者自我更新 - 因为知道我的所有订阅者的工作方式会使整个架构变得毫无用处,但这对于这样的任务来说听起来有点矫枉过正,而且我不只是意思是演示,我真的无法想象有一个每一天"现实世界的例子,像这样的架构将使事情变得更干净,而不仅仅是观察任何变化,并主动"获取计算值.来自我的表格.

I figured I could get another observable to notify my subscriptions to update themselves - as knowing how all my subscribers work would render the whole architecture useless, but this sounds like an overkill for a task like this, and I don't just mean the demo, I can't really imagine to have an "every day" real world example where an architecture like this would make things cleaner than just watching for any change, and getting the calculated values "actively" from my forms.

我可能会在处理表单的模块中主动获取和求和值,并让m"外部世界可以观察到订阅,将我的价值观从模块内部推入其中.

I'd probably do the active getting, and summing of values inside the module handling the forms, and have the "m" observable for the outside world to subscribe to, pushing my values into it from inside the module.

这是正确的方法吗?我认为是的,因为它们归我的模块所有,我应该完全控制发生在它们身上的事情,但我真的很想知道更有经验的人对此的看法.

Would this be a correct approach? I'd think yes, because they are owned by my module, I'm supposed to have full control over what is happening to them, but I'm really interested in what more experienced people think about this.

推荐答案

我知道我参加这个聚会迟到了,但我刚刚遇到了同样的问题,并且找不到任何合适的答案.我想出了如何在我的应用程序上下文中执行此操作,因此除非这对您来说仍然是一个悬而未决的问题,否则我将在此处发布答案以供后代使用.

I know I'm late to this party, but I just had this same exact question, and could not find any suitable answer. I figured out how to do this in the context of my application, so unless this is still an open issue for you, I'll post the answer here for posterity.

为了解决我的问题,我有一个带有许多控件的 Angular 表单.每当任何一个控件的验证状态发生变化时,我都需要做一些事情.但是,控件可以在以后动态添加到表单中,而且我仍然需要监听这些单独控件的验证更改.

To set up my problem, I have an Angular form with a number of controls. Anytime the validation state changes on any one of the controls, I need to do something. However, controls can be dynamically added to the form later, and I still need to listen for validation changes on those individual controls as well.

我将尝试稍微改变我的方法以适应问题的上下文.基本上,是的,正确的方法是在内部 Observable 更新订阅时使用外部 Observable 信号.这是可以接受的架构,坦率地说,这也是 Reactive Extensions 如此强大的原因之一.下面是我如何使用 RxJS 6.2.2 和 TypeScript 3.0.1 做到的.请注意,在提出此问题或之前回答此问题时,此方法可能不可用.

I'll try to alter my approach a little to fit the question's context. Basically, yes the right approach here is to have an outer Observable signal when to have the inner Observables update their subscriptions. This is acceptable architecture and frankly is one of the reasons Reactive Extensions is so powerful. Here's how I did it using RxJS 6.2.2 and TypeScript 3.0.1. Note that at the time this question was asked, or previously answered, this method might not have been available.

import { map, mergeMap, delay } from 'rxjs/operators';
import { interval, Subject, Observable } from 'rxjs';

// utility function to create sample observables
const createObs = (time: number, i: string) => interval(time).pipe(map(val => `${i}: ${val}`))

// create static sample observables. 
// these could represent the value changes of known form elements
const obsA = createObs(1000, 'A'); // emit every 1 second
const obsB = createObs(2500, 'B'); // emit every 2.5 seconds

// create a Subject which will represent dynamically adding a new stream of form values to the calculation
const formChanges = new Subject<Observable<string>>();

// this will hold the accumulated results
const results: string[] = [];

// subscribe to the subject
// each time any of the inner observables emits a new value, this will log the latest value of all existing inner observables
formChanges.pipe(
  mergeMap(innerObs => innerObs, (outerValue, innerValue, outerIndex, innerIndex) => {
    results[outerIndex] = innerValue;
    return results;
  })
).subscribe(console.log);

// emit the known form value change streams
formChanges.next(obsA);
formChanges.next(obsB);

// this will represent adding some other stream of values to the calculation later
formChanges.next(createObs(1750, 'C').pipe(delay(5000))) // after 5 seconds, emit every 1.75 seconds

// Output
// [ 'A: 0' ]
// [ 'A: 1' ]
// [ 'A: 1', 'B: 0' ]
// [ 'A: 2', 'B: 0' ]
// [ 'A: 3', 'B: 0' ]
// [ 'A: 3', 'B: 1' ]
// [ 'A: 4', 'B: 1' ]
// [ 'A: 5', 'B: 1' ]
// [ 'A: 5', 'B: 1', 'C: 0' ]
// [ 'A: 6', 'B: 1', 'C: 0' ]
// [ 'A: 6', 'B: 2', 'C: 0' ]
// [ 'A: 7', 'B: 2', 'C: 0' ]
// [ 'A: 7', 'B: 2', 'C: 1' ]
// [ 'A: 8', 'B: 2', 'C: 1' ]

这篇关于将未知数量的可观察对象与 RxJS 合并的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆