理解 RXJS 可观察输出的问题 [英] Issue with understanding RXJS Observable Output

查看:54
本文介绍了理解 RXJS 可观察输出的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对 rxjs 还很陌生,正在努力学习.

我正在阅读这篇中等文章 并试图理解这个 RXJS 输出:

从'rxjs-es'导入{Observable};让输出 = Observable.interval(500).map(i => [1,2,3,4,5,6][i]);让结果 = output.map(num1 => num1).filter(num1 => num1 > 4).reduce((num1, num2) => num1 + num2);result.subscribe(number => console.log(number));

<块引用>

输出:27

我不明白输出结果是 27 以及 reduce 是如何工作的(以及 reducer 的这两个参数是什么).

有人能告诉我发生了什么吗?(我尝试在 codeandbox 上运行它,但是当我运行此代码时它会抛出错误)

解决方案

了解 FilterReduce 运算符

正如其他人在评论中提到的,您引用的文章使用的是具有不同语法的旧版 rxjs.对于此示例,我将使用从版本 6 开始的较新语法.

在 rxjs 中,有多种运算符可用于转换通过流发出的值.通常这些是导入的:

import { filter, reduce } from 'rxjs/operators';

还有许多生成器函数可用于创建值流.interval 是这些函数之一,它将创建一个每 n 毫秒发出连续整数的流.导入如下:

import { interval } from 'rxjs';

让我们创建一个简单的流:

number$ = interval(1000);//每 1 秒发出一次数字//输出:0, 1, 2, 3, 4, 5...

我们可以将运算符应用于此流以转换排放:

filter 的使用非常简单.它只是发出通过给定真值测试的值(与 Array.filter() 方法完全一样).

numbersLessThan4$ = numbers$.pipe(过滤器(数字 => 数字 <4));//输出:0, 1, 2, 3

reduce 运算符稍微复杂一些,其行为就像 Array.reduce() 方法.一个函数应用于每个发出的值,并能够存储一个值,在评估下一次发出时可以引用该值.

reduce 需要两个参数.第一个是接收当前发射 (cur) 和先前累积结果 (acc) 并返回新累积值的函数.第二个是 acc 的初始值.

示例:

sumOfNumbers$ = numbers$.pipe(reduce((acc, cur) => acc + cur, 0));

那么,让我们看看当 numbers$ 发出前 3 个数字时 reduce 做了什么:

  • 0
    • cur 接收当前的发射值 0
    • acc 以提供的默认 0
    • 开头
    • 表达式acc + cur 返回0
  • 1
    • cur 接收当前的发射值 1
    • acc 接收之前返回的值 0
    • 表达式acc + cur 返回1
  • 2
    • cur 接收当前发射值 2
    • acc 接收之前返回的值 1
    • 表达式acc + cur 返回3

所以这很酷.我们可以在一行简单的代码中得到相当多的逻辑.关于 reduce 的一件重要的事情是它在源 observable 完成之前不会发出.目前,numbers$ 永远不会完成(interval() 无限期地发出连续整数).

我们可以使用 take() 操作符在发出一定数量的值后完成流.

示例:

numbers$ = interval(1000).pipe(take(5));//在 5 次排放后完成sumOfNumbers$ = numbers$.pipe(//接收 5 个值(0、1、2、3、4)并执行上述逻辑.reduce((acc, cur) => acc + cur, 0));//输出:10

可以使用多个运算符来转换排放.只需在 pipe() 中提供多个:

sumOfNumbersLessThan4$ = numbers$.pipe(过滤器(数字 => 数字 <4),reduce((acc, cur) => acc + cur, 0));//输出:6

I'm fairly new to rxjs and trying to learn.

I was reading through this medium article and trying to understand this RXJS output:

import {Observable} from 'rxjs-es';
let output = Observable.interval(500)
             .map(i => [1,2,3,4,5,6][i]);

let result = output.map(num1 => num1)
    .filter(num1 => num1 > 4)
    .reduce((num1, num2) => num1 + num2);

result.subscribe(number => console.log(number));

Output: 27

I don't understand how the output turned out to be 27 and how that reduce is working (and what are those two arguments for the reducer).

Can someone enlighten me what's happening? (I've tried to run it on codesandbox however it's throwing an error when I run this code)

解决方案

Understanding Filter and Reduce Operators

As others have mentioned in the comments, the article you refer to is using an older version of rxjs that has a different syntax. For this example I'll use the newer syntax that started in version 6.

In rxjs, there are various operators available to transform the values emitted through a stream. Typically these are imported like:

import { filter, reduce } from 'rxjs/operators';

There are also many generator functions available to create a stream of values. interval is one of these functions that will create a stream that emits sequential integers every n milliseconds. Imported like:

import { interval } from 'rxjs';

Let's create a simple stream:

number$ = interval(1000); // emit number every 1 second
// output: 0, 1, 2, 3, 4, 5...

We can apply operators to this stream to transform the emissions:

The usage of filter is pretty simple. It simply emits values that pass the given truth test (exactly like the Array.filter() method).

numbersLessThan4$ = numbers$.pipe(
    filter(number => number < 4)
);
// output: 0, 1, 2, 3

The reduce operator is a bit more complex and behaves just like the Array.reduce() method. A function is applied to each emitted value and is able store a value that can be referenced when the next emission is evaluated.

reduce takes two parameters. The first is a function that receives the current emission (cur) and the previous accumulated result (acc) and returns a new accumulated value. The second is an initial value for acc.

example:

sumOfNumbers$ = numbers$.pipe(
    reduce((acc, cur) => acc + cur, 0)
);

So, let's look what reduce does when numbers$ emits the first 3 numbers:

  • 0
    • cur receives the current emission value 0
    • acc starts with the provided default 0
    • the expression acc + cur returns 0
  • 1
    • cur receives the current emission value 1
    • acc receives the previously returned value 0
    • the expression acc + cur returns 1
  • 2
    • cur receives the current emission value 2
    • acc receives the previously returned value 1
    • the expression acc + cur returns 3

So this is cool. We can get quite a lot of logic into a simple line of code. One important thing about reduce is that it will not emit until the source observable completes. Currently, numbers$ never completes (interval() emits sequential integers indefinitely).

We can use the take() operator to complete the stream after a certain number of values are emitted.

Example:

numbers$ = interval(1000).pipe(take(5)); // completes after 5 emissions

sumOfNumbers$ = numbers$.pipe(
    // receives 5 values (0, 1, 2, 3, 4) and performs the logic described above.
    reduce((acc, cur) => acc + cur, 0) 
);
// output: 10

Multiple operators can be used to transform the emissions. Simply provide multiple inside the pipe():

sumOfNumbersLessThan4$ = numbers$.pipe(
    filter(number => number < 4),
    reduce((acc, cur) => acc + cur, 0) 
);
// output: 6

这篇关于理解 RXJS 可观察输出的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆