如何将 Observable 数组与 RxJS 6.x 和 Node.js 合并? [英] How do I merge an array of Observables with RxJS 6.x and Node.js?
问题描述
出于学习目的,我正在创建一个 Node 应用程序,该应用程序需要从数组中获取 x 个 RxJS observable 并组合成单个事件流.我想知道事件何时以任何可观察的方式发生,以任何顺序(不以任何顺序或完全完成).我觉得它应该在一个合并的事件流中.基本上,来自任何 observable 的第一个事件将完成.
For learning purposes, I'm creating a Node app that will need to take x RxJS observables from an array and combine into a single stream of events. I want to know when events occur in any observable, in any order (not in any sequence or full completion). I feel it should be in a single merged stream of events. Basically, the first event that comes through from any observable will complete.
为此,我觉得 merge() 可以解决问题.由于merge 不直接将数组作为参数,因此到目前为止我使用reduce 来帮助合并.
For this, I felt merge() will do the trick. As merge doesn't take arrays directly as a parameter, I'm using reduce so far to help merge.
然而,最终的结果不是一个可观察的,而是一个函数.我也订阅不了代码的简化版本可以在下面看到.
However, the end result is not an observable, but a function. I can't subscribe to it either. A simplified version of the code can be seen below.
如何更改此 Node 10.14.2、RxJS 6.4.x 代码以返回可观察对象而不是我可以添加 .subscribe() 的[function]"?
How can I alter this Node 10.14.2, RxJS 6.4.x code to return an observable and not a "[function]" that I can tack a .subscribe() to?
const { Observable } = require('rxjs');
const { merge } = require('rxjs/operators');
const observables = [
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello'))
];
const mergedObservables = observables.reduce((merged, observable) => {
console.log(observable);
return merge(merged, observable);
});
// outputs:
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }
console.log(mergedObservables);
// outputs:
// [Function]
mergedObservables.subscribe();
// error:
// TypeError: mergedObservables.subscribe is not a function
推荐答案
您正在导入 merge
operator 而不是静态的 merge代码> 功能.前者对来自源 observable 的事件进行操作,而后者从一个或多个源 observable 创建一个新的 observable.虽然考虑下面的扩展语法可以简化您的代码,但这并不是您真正的问题.
You're importing the merge
operator as opposed to the static merge
function. The former operates on events from a source observable while the latter creates a new observable from one or more source observables. While the thought about the spread syntax below simplifies your code, it wasn't your real issue.
出现Node.js >= 5.0 支持函数调用中数组的展开运算符(不过,您没有指定您使用的是哪个版本的 Node.js).如果您使用的是现代版本的 Node.js,以下内容应该可以工作:
It appears Node.js >= 5.0 supports the spread operator for arrays in function calls (you don't specify which version of Node.js you're using, though). The following should work if you're using a modern version of Node.js:
const { Observable, merge } = require('rxjs')
const observables = [
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello')),
Observable.create(observer => observer.next('Hello'))
]
const mergedObservables = merge(...observables)
mergedObservables.subscribe(event => { console.log(event) })
这篇关于如何将 Observable 数组与 RxJS 6.x 和 Node.js 合并?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!