mergeMap 运算符究竟是如何工作的,在什么情况下使用它? [英] How exactly does the mergeMap operator work and in which cases is it used?

查看:53
本文介绍了mergeMap 运算符究竟是如何工作的,在什么情况下使用它?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

来之前看过Rxjs的官方文档和其他一些页面,但还是不太清楚.我的理解是这样的:

Before coming here I have read the official documentation of Rxjs and some other pages but I am still not clear. What I understood is this:

它用于加入"2个observable,从而获得一个observable,我也看到它用于展平"一个observable(我也不是很清楚).

It is used to "join" 2 observables and thus obtain a single observable as a result, I also saw that it is used to "flatten" an observable (I am also not very clear).

现在......我有几天时间尝试使用 Angular 和 Node.js 和 Express 编写用户注册表,我发现了一个我决定使用的小教程,它有以下代码:

Now ... I have days trying to program a user registry using Angular and Node.js with Express and I found a little tutorial which I decided to use and it has this code:

import { Injectable, Injector } from '@angular/core';
import { HttpClient, HttpInterceptor, HttpRequest, HttpHandler, HttpEvent, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError } from 'rxjs';
import { catchError, retry, mergeMap } from 'rxjs/operators'
import { AuthenticationService } from './authentication.service';

@Injectable({
	providedIn: 'root'
})
export class AppInterceptor implements HttpInterceptor {

	constructor(private injector: Injector) { }

	intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
		let accessToken = "", refreshToken = ""

		const tokens = JSON.parse(sessionStorage.getItem("tokens"))
		if (tokens) {
			accessToken = tokens.accessToken
			refreshToken = tokens.refreshToken
		}

		let clonHttp: HttpRequest<any> 
		clonHttp = tokens ? req.clone({ headers: req.headers.append("Authorization", `Bearer ${accessToken}`) }) : req
		
		let auth = this.injector.get(AuthenticationService);

		return next.handle(clonHttp)
			.pipe(
				catchError((error: HttpErrorResponse) => {
					if (error.error instanceof ErrorEvent) {
						console.log("error event")
					} else if (error.status == 401) {
						return auth.getNewAccessToken(refreshToken)
							.pipe(
								retry(3),
								mergeMap(
									(response: any) => {
										tokens.accessToken = response.accessToken
										sessionStorage.setItem("tokens", JSON.stringify(tokens))

										clonHttp = req.clone({ headers: req.headers.append("Authorization", `Bearer ${response.accessToken}`) })
										return next.handle(clonHttp)
									}
								)
							)
					} else if (error.status == 409) {
						return throwError("User not logged")
					} else {
						if (error.error && error.error.message) {
							return throwError(error.error.message)
						} else {
							return throwError("Check your connection")
						}
					}
				})
			)
	}

}

如果您看到,当您使用 MergeMap 运算符时,它们只会向您传递答案(一个可观察对象),或者至少这是我所看到的.我想说的是,我没有看到他们将它与 2 个 observables 一起使用或混合 2 个 observables,这是我在他们的官方文档中读到的,事实上,在他们展示的示例中,他们总是使用它有 2 个 observables.

If you see, when you use the MergeMap operator they only pass you the answer (a single observable), or at least that's what I can see. What I'm trying to say is that I don't see that they are using it with 2 observables or to mix 2 observables, which is what I have read in their official documentation, in fact, in the examples they show they always use it with 2 observables.

老实说,我理解这个运算符太困难了,如果有人能帮助我以简单的方式理解它,除了理解它在我之前展示的代码中的用法之外,我将非常感激.提前问好.谢谢!

Honestly it has been too difficult for me to understand this operator, if someone could help me understand it in a simple way, I would be extremely grateful, in addition to understanding its use in that code that I show earlier. Greetings in advance. Thank you!

推荐答案

mergeMap 像许多其他所谓的高阶映射运算符一样,维护一个或多个内部可观察对象.

mergeMap, like many other so-called higher order mapping operators, maintains one or multiple inner observables.

内部可观察是用外部值提供的函数创建的.外部价值本质上就是从其来源接收到的价值.例如:

An inner observable is created with the outer value and the provided function. The outer value essentially is just the value received from its source. For example:

of(1, 2, 3).pipe(
  mergeMap((outerValue, index) => /* ... return an observable ... */)
).subscribe(); // `outerValue`: 1, 2, 3 (separately)

当外部值进来时,将创建一个新的内部可观察对象.我认为理解这一点的最好方法是查看 源代码:

When an outer value comes in, a new inner observable will be created. I think the best way to understand this is to have a look at the source code:

// `value` - the `outerValue`
protected _next(value: T): void {
  if (this.active < this.concurrent) {
    this._tryNext(value);
  } else {
    this.buffer.push(value);
  }
}

protected _tryNext(value: T) {
  let result: ObservableInput<R>;
  const index = this.index++;
  try {
    // Create the inner observable based on the `outerValue` and the provided function (`this.project`)
    // `mergeMap(project)`
    result = this.project(value, index);
  } catch (err) {
    this.destination.error(err);
    return;
  }
  this.active++;
  // Subscribe to the inner observable
  this._innerSub(result, value, index);
}

暂时先忽略concurrentbuffer,我们稍后再看.

Please disregard for now concurrent and buffer, we'll have a look at them a bit later.

现在,当内部 observable 发出时会发生什么?在进一步讨论之前,值得一提的是,虽然很明显,内部可观察 需要一个内部订阅者.我们可以在上面的 _innerSub 方法中看到这一点:

Now, what happens when an inner observable emits ? Before going any further, it's worth mentioning that, although it's obvious, an inner observable requires an inner subscriber. We can see this in the _innerSub method from above:

private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
  const innerSubscriber = new InnerSubscriber(this, value, index);
  const destination = this.destination as Subscription;
  destination.add(innerSubscriber);
  // This is where the subscription takes place
  subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
}

当内部 observable 发出时,notifyNext 方法 将被调用:

When an inner observable emits, the notifyNext method will be called:

notifyNext(outerValue: T, innerValue: R,
            outerIndex: number, innerIndex: number,
            innerSub: InnerSubscriber<T, R>): void {
  this.destination.next(innerValue);
}

目的地指向链中的下一个订阅者.例如,它可以是这样的:

Where destination points to the next subscriber in the chain. For example, it can be this:

of(1)
  .pipe(
    mergeMap(/* ... */)
  )
  .subscribe({} /* <- this is the `destination` for `mergeMap` */)

这将在下面的链中的下一个订阅者中更详细地解释.

This will be explained in more detail in What about the next subscriber in the chain below.

那么,混合两个 observables 是什么意思?

让我们看看这个例子:

of(2, 3, 1)
  .pipe(
    mergeMap(outerValue => timer(outerValue).pipe(mapTo(outerValue)))
  )
  .subscribe(console.log)
  /* 1 \n 2 \n 3 */

2 到达时,mergeMap 将订阅一个将在 200ms 内发出的内部 observable.这是一个异步操作,但请注意外部值 (2, 3, 1) 同步到达.接下来,3 到达并将创建一个内部 obs.这将在 300 毫秒内发出.由于当前脚本还没有执行完毕,所以还没有考虑回调队列.现在 1 到达,将创建一个内部 obs.将在 100 毫秒内发出.

When 2 arrives, mergeMap will subscribe to an inner observable that will emit in 200ms. This is an asynchronous action, but notice that the outer values(2, 3, 1) arrive synchronously. Next, 3 arrives and will create an inner obs. that will emit in 300ms. Since the current script has not finished executing yet, the callback queue is not yet considered. Now 1 arrives, and will create an inner obs. that will emit in 100 ms.

mergeMap 现在有 3 个内部 observable,并将传递任何内部 observable 发出的内部值.
正如预期的那样,我们得到 123.

mergeMap has now 3 inner observables and will pass along the inner value of whichever inner observable emits.
As expected, we get 1, 2, 3.

这就是 mergeMap 所做的.Mixing observables 可以这样想:如果外部值来了并且内部 observable 已经创建,那么 mergeMap 只是说:没问题,我会只需创建一个新的内部 obs.并订阅它".

So that's what mergeMap does. Mixing observables can be thought of this way: if an outer value comes and an inner observable has already been created, then mergeMap simply says: "no problem, I'll just create a new inner obs. and subscribe to it".

mergeMap 可以被赋予第二个参数,concurrent,它表示应该同时处理多少内部 observable.使用 active 属性跟踪这些活动内部 observables 的数量.

mergeMap can be given a second argument, concurrent which indicates how many inner observables should handle at the same time. These number of active inner observables is tracked with the active property.

正如在_next方法中看到的,如果active >= concurrentouterValues将被添加到buffercode>,这是一个队列(FIFO).

As seen in _next method, if active >= concurrent, the outerValues will be added to a buffer, which is a queue(FIFO).

然后,当一个活动的内部可观察对象完成时,mergeMap 将从该值中获取最旧的值,并使用提供的函数从中创建一个内部可观察对象:

Then, when one active inner observable completes, mergeMap will take the oldest value from the value and will create an inner observable out of it, using the provided function:

// Called when an inner observable completes
notifyComplete(innerSub: Subscription): void {
  const buffer = this.buffer;
  this.remove(innerSub);
  this.active--;
  if (buffer.length > 0) {
    this._next(buffer.shift()!); // Create a new inner obs. with the oldest buffered value
  } else if (this.active === 0 && this.hasCompleted) {
    this.destination.complete();
  }
}

考虑到这一点,concatMap(project) 就是 mergeMap(project, 1).

With this in mind, concatMap(project) is just mergeMap(project, 1).

所以,如果你有:

of(2, 3, 1)
  .pipe(
    mergeMap(outerValue => timer(outerValue * 100).pipe(mapTo(outerValue)), 1)
  )
  .subscribe(console.log)

这将被记录:

2 \n 3 \n 1.

操作符是函数,它返回另一个函数,它接受一个observable作为它们的唯一参数返回另一个可观察的.当订阅流时,运营商返回的每个 observable 都会有其自己的订阅者.

Operators are functions that return another function which accepts an observable as their only parameter and return another observable. When a stream is being subscribed to, each observable returned by an operator will have its own subscriber.

所有这些订阅者都可以看作是一个链表.例如:

All these subscribers can be seen as a linked list. For example:

// S{n} -> Subscriber `n`, where `n` depends on the order in which the subscribers are created

of(/* ... */)
  .pipe(
    operatorA(), // S{4}
    operatorB(), // S{3}
    operatorC(), // S{2}
  ).subscribe({ /* ... */ }) // S{1}; the observer is converted into a `Subscriber`

S{n}S{n+1} 的父(destination),意味着 S{1}S{2}的目的地,S{2}S{3}的目的地,依此类推.

S{n} is the parent(destination) of S{n+1}, meaning that S{1} is the destination of S{2}, S{2} is the destination of S{3} and so forth.

StackBlitz

比较这些:

of(2, 1, 0)
  .pipe(
    mergeMap(v => timer(v * 100).pipe(mapTo(v)))
  ).subscribe(console.log)
// 0 1 2

of(2, 1, 0)
  .pipe(
    mergeMap(v => timer(v).pipe(mapTo(v)))
  ).subscribe(console.log)
// 1 0 2

根据 MDN:

指定的时间量(或延迟)不是保证执行时间,而是最短执行时间.在主线程上的堆栈为空之前,您传递给这些函数的回调无法运行.

The specified amount of time (or the delay) is not the guaranteed time to execution, but rather the minimum time to execution. The callbacks you pass to these functions cannot run until the stack on the main thread is empty.

因此,像 setTimeout(fn, 0) 这样的代码会在堆栈为空时立即执行,而不是立即执行.如果您执行 setTimeout(fn, 0) 之类的代码,但在运行一个从 1 到 100 亿的循环后立即执行,您的回调将在几秒钟后执行.

As a consequence, code like setTimeout(fn, 0) will execute as soon as the stack is empty, not immediately. If you execute code like setTimeout(fn, 0) but then immediately after run a loop that counts from 1 to 10 billion, your callback will be executed after a few seconds.

这部分由 MDN 提供把事情也说清楚.

This section by MDN should clarify things as well.

我会说这是特定于环境的,而不是特定于 RxJs 的.

I'd say this is environment-specific, rather than RxJs-specific.

在第二个代码段中,延迟是连续的,这就是您获得意外结果的原因.如果你稍微增加延迟,比如:timer(v * 2),你应该得到预期的行为.

In the second snippet, the delays are consecutive so that's why you're getting unexpected results. If you increase the delays just a bit, like: timer(v * 2), you should get the expected behavior.

这篇关于mergeMap 运算符究竟是如何工作的,在什么情况下使用它?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆