RxJS MergeMap如何工作? [英] How Does RxJS MergeMap Work?

查看:189
本文介绍了RxJS MergeMap如何工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我根本不理解 mergeMap 的目的。我听过两个解释:

I don't understand the purpose of mergeMap at all. I have heard two "explanations:


  1. 它就像LINQ中的SelectAll - nope。

  2. 好吧,它是RxJS merge map 的组合 - nope(或者我不能复制这个)。

  1. "It's like SelectAll" in LINQ - nope.
  2. "Well, it's a combination of RxJS merge and map" - nope (or I can't replicate this).

请考虑以下代码:

    var obs1 = new Rx.Observable.interval(1000);
    var obs2 = new Rx.Observable.interval(1000);

    //Just a merge and a map, works fine
    obs1.merge(obs2).map(x=> x+'a').subscribe(
      next => console.log(next)
    )

    //Who know what - seems to do the same thing as a plain map on 1 observable
    obs1.mergeMap(val => Rx.Observable.of(val + `B`))
        .subscribe(
          next => console.log(next)
        )

JS Bin

标有谁知道什么的最后一篇文章只做 obs1 上的地图 - 有什么意义?

The last piece labelled "Who knows what" does nothing more than a map on obs1 - what's the point?

mergeMap 实际上做了什么?什么是有效用例的示例? (最好带一些代码)

What does mergeMap actually do? What is an example of a valid use case? (Preferably with some code)

根本没有帮助我的文章(上面的mergeMap代码来自其中一个): 1 2

Articles that didn't help me at all (mergeMap code from above is from one of these): 1, 2

推荐答案

tl ; dr; mergeMap map 更强大。理解 mergeMap 是访问Rx全部功率的必要条件。

tl;dr; mergeMap is way more powerful than map. Understanding mergeMap is the necessary condition to access full power of Rx.


  • mergeMap map 作用于单个流(与 zip combineLatest

  • both mergeMap and map acts on a single stream (vs. zip, combineLatest)

mergeMap map 可以转换流的元素(与过滤器延迟

both mergeMap and map can transform elements of a stream (vs. filter, delay)


  • 无法更改源流的大小(假设: map 本身不会 throw );对于源中的每个元素,只发出一个映射元素; map 不能忽略元素(例如 filter );

  • cannot change size of the source stream (assumption: map itself does not throw); for each element from source exactly one mapped element is emitted; map cannot ignore elements (like for example filter);

在默认调度程序的情况下,转换同步发生; 100%清除:源流可以异步传递其元素,但每个下一个元素立即映射并进一步重新发出; map 无法及时转换元素,例如延迟

in case of the default scheduler the transformation happens synchronously; to be 100% clear: the source stream may deliver its elements asynchronously, but each next element is immediately mapped and re-emitted further; map cannot shift elements in time like for example delay

对返回值没有限制

id x = > x


  • 可以更改源流的大小;对于每个元素,可能有任意数量(0,1或多个)创建/发出的新元素

  • can change size of the source stream; for each element there might be arbitrary number (0, 1 or many) of new elements created/emitted

它提供对异步性的完全控制 - 两者都是新元素是否创建/发出源流中应该同时处理多少个元素;例如,假设源流发出10个元素,但 maxConcurrency 设置为2,那么将立即处理两个第一个元素,其余8个缓冲;一旦处理完完成 d之后,源流中的下一个元素将被处理,依此类推 - 这有点棘手,但请看下面的例子

it offers full control over asynchronicity - both when new elements are created/emitted and how many elements from the source stream should be processed concurrently; for example assume source stream emitted 10 elements but maxConcurrency is set to 2 then two first elements will be processed immediately and the rest 8 buffered; once one of the processed completed the next element from source stream will be processed and so on - it is bit tricky, but take a look at the example below

所有其他运算符只需 mergeMap Observable 构造函数

all other operators can be implemented with just mergeMap and Observable constructor

可用于递归异步操作

返回值必须为Observable类型(或者Rx必须知道如何从中创建observable - 例如promise,array)

return values has to be of Observable type (or Rx has to know how to create observable out of it - e.g. promise, array)

id x => Rx.Observable.of(x)

let array = [1,2,3]
fn             map                    mergeMap
x => x*x       [1,4,9]                error /*expects array as return value*/
x => [x,x*x]   [[1,1],[2,4],[3,9]]    [1,1,2,4,3,9]

该类比并未显示完整图片,它基本上对应于 .mergeMap maxConcurrency 设置为1.在这种情况下,元素将按上面的顺序排序,但一般情况下不必如此。我们唯一的保证是新元素的排放将按其在基础流中的位置排序。例如: [3,1,2,4,9,1] [2,3,1,1,9,4] 有效,但 [1,1,4,2,3,9] 不是(因为 4 在基础流中的 2 之后发出。

The analogy does not show full picture and it basically corresponds to .mergeMap with maxConcurrency set to 1. In such a case elements will be ordered as above, but in general case it does not have to be so. The only guarantee we have is that emission of new elements will be order by their position in the underlying stream. For example: [3,1,2,4,9,1] and [2,3,1,1,9,4] are valid, but [1,1,4,2,3,9] is not (since 4 was emitted after 2 in the underlying stream).

// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
  return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}

Rx.Observable.range(1, 3)
  .mapWithMergeMap(x => x * x)
  .subscribe(x => console.log('mapWithMergeMap', x))

// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
  return this.mergeMap(x =>
    filterFn(x) ?
    Rx.Observable.of(x) :
    Rx.Observable.empty()); // return no element
}

Rx.Observable.range(1, 3)
  .filterWithMergeMap(x => x === 3)
  .subscribe(x => console.log('filterWithMergeMap', x))

// implement .delay with .mergeMap 
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
  return this.mergeMap(x =>
    Rx.Observable.create(obs => {
      // setTimeout is naive - one should use scheduler instead
      const token = setTimeout(() => {
        obs.next(x);
        obs.complete();
      }, delayMs)
      return () => clearTimeout(token);
    }))
}

Rx.Observable.range(1, 3)
  .delayWithMergeMap(500)
  .take(2)
  .subscribe(x => console.log('delayWithMergeMap', x))

// recursive count
const count = (from, to, interval) => {
  if (from > to) return Rx.Observable.empty();
  return Rx.Observable.timer(interval)
    .mergeMap(() =>
      count(from + 1, to, interval)
      .startWith(from))
}

count(1, 3, 1000).subscribe(x => console.log('count', x))

// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
  Rx.Observable.if(
    () => from > to,
    Rx.Observable.empty(),
    Rx.Observable.timer(interval)
    .mergeMap(() => countMoreRxWay(from + 1, to, interval)
      .startWith(from)))

const maxConcurrencyExample = () =>
  Rx.Observable.range(1,7)
    .do(x => console.log('emitted', x))
    .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
    .do(x => console.log('processed', x))
    .subscribe()

setTimeout(maxConcurrencyExample, 3100)

<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>

这篇关于RxJS MergeMap如何工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆