如何将 Observable 数组与 RxJS 6.x 和 Node.js 合并? [英] How do I merge an array of Observables with RxJS 6.x and Node.js?

查看:48
本文介绍了如何将 Observable 数组与 RxJS 6.x 和 Node.js 合并?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

出于学习目的,我正在创建一个 Node 应用程序,该应用程序需要从数组中获取 x 个 RxJS observable 并组合成单个事件流.我想知道事件何时以任何可观察的方式发生,以任何顺序(不以任何顺序或完全完成).我觉得它应该在一个合并的事件流中.基本上,来自任何 observable 的第一个事件将完成.

For learning purposes, I'm creating a Node app that will need to take x RxJS observables from an array and combine into a single stream of events. I want to know when events occur in any observable, in any order (not in any sequence or full completion). I feel it should be in a single merged stream of events. Basically, the first event that comes through from any observable will complete.

为此,我觉得 merge() 可以解决问题.由于merge 不直接将数组作为参数,因此到目前为止我使用reduce 来帮助合并.

For this, I felt merge() will do the trick. As merge doesn't take arrays directly as a parameter, I'm using reduce so far to help merge.

然而,最终的结果不是一个可观察的,而是一个函数.我也订阅不了代码的简化版本可以在下面看到.

However, the end result is not an observable, but a function. I can't subscribe to it either. A simplified version of the code can be seen below.

如何更改此 Node 10.14.2、RxJS 6.4.x 代码以返回可观察对象而不是我可以添加 .subscribe() 的[function]"?

How can I alter this Node 10.14.2, RxJS 6.4.x code to return an observable and not a "[function]" that I can tack a .subscribe() to?

const { Observable } = require('rxjs');
const { merge } = require('rxjs/operators');

const observables = [
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello'))
];

const mergedObservables = observables.reduce((merged, observable) => {
    console.log(observable);
    return merge(merged, observable);
});

// outputs:
// Observable { _isScalar: false, _subscribe: [Function] }
// Observable { _isScalar: false, _subscribe: [Function] }

console.log(mergedObservables);

// outputs:
// [Function]

mergedObservables.subscribe();
// error:
// TypeError: mergedObservables.subscribe is not a function

推荐答案

您正在导入 merge operator 而不是静态的 merge 功能.前者对来自源 observable 的事件进行操作,而后者从一个或多个源 observable 创建一个新的 observable.虽然考虑下面的扩展语法可以简化您的代码,但这并不是您真正的问题.

You're importing the merge operator as opposed to the static merge function. The former operates on events from a source observable while the latter creates a new observable from one or more source observables. While the thought about the spread syntax below simplifies your code, it wasn't your real issue.

出现Node.js >= 5.0 支持函数调用中数组的展开运算符(不过,您没有指定您使用的是哪个版本的 Node.js).如果您使用的是现代版本的 Node.js,以下内容应该可以工作:

It appears Node.js >= 5.0 supports the spread operator for arrays in function calls (you don't specify which version of Node.js you're using, though). The following should work if you're using a modern version of Node.js:

const { Observable, merge } = require('rxjs')

const observables = [
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello')),
    Observable.create(observer => observer.next('Hello'))
]

const mergedObservables = merge(...observables)
mergedObservables.subscribe(event => { console.log(event) })

这篇关于如何将 Observable 数组与 RxJS 6.x 和 Node.js 合并?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆