¿mergeMap运算符的工作原理是什么?在哪种情况下会使用它? [英] ¿How exactly does the mergeMap operator work and in which cases is it used?

查看:51
本文介绍了¿mergeMap运算符的工作原理是什么?在哪种情况下会使用它?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在来到这里之前,我已经阅读了Rxjs的官方文档和其他一些页面,但是我仍然不清楚.我的理解是:

Before coming here I have read the official documentation of Rxjs and some other pages but I am still not clear. What I understood is this:

它用于连接"两个可观察对象,从而获得一个单一的可观察对象,我还看到它用于展平"一个可观察对象(我也不是很清楚).

It is used to "join" 2 observables and thus obtain a single observable as a result, I also saw that it is used to "flatten" an observable (I am also not very clear).

现在...我有几天尝试用Angular和Node.js和Express编写用户注册表,我发现了一个我决定使用的小教程,它具有以下代码:

Now ... I have days trying to program a user registry using Angular and Node.js with Express and I found a little tutorial which I decided to use and it has this code:

import { Injectable, Injector } from '@angular/core';
import { HttpClient, HttpInterceptor, HttpRequest, HttpHandler, HttpEvent, HttpErrorResponse } from '@angular/common/http';
import { Observable, throwError } from 'rxjs';
import { catchError, retry, mergeMap } from 'rxjs/operators'
import { AuthenticationService } from './authentication.service';

@Injectable({
	providedIn: 'root'
})
export class AppInterceptor implements HttpInterceptor {

	constructor(private injector: Injector) { }

	intercept(req: HttpRequest<any>, next: HttpHandler): Observable<HttpEvent<any>> {
		let accessToken = "", refreshToken = ""

		const tokens = JSON.parse(sessionStorage.getItem("tokens"))
		if (tokens) {
			accessToken = tokens.accessToken
			refreshToken = tokens.refreshToken
		}

		let clonHttp: HttpRequest<any> 
		clonHttp = tokens ? req.clone({ headers: req.headers.append("Authorization", `Bearer ${accessToken}`) }) : req
		
		let auth = this.injector.get(AuthenticationService);

		return next.handle(clonHttp)
			.pipe(
				catchError((error: HttpErrorResponse) => {
					if (error.error instanceof ErrorEvent) {
						console.log("error event")
					} else if (error.status == 401) {
						return auth.getNewAccessToken(refreshToken)
							.pipe(
								retry(3),
								mergeMap(
									(response: any) => {
										tokens.accessToken = response.accessToken
										sessionStorage.setItem("tokens", JSON.stringify(tokens))

										clonHttp = req.clone({ headers: req.headers.append("Authorization", `Bearer ${response.accessToken}`) })
										return next.handle(clonHttp)
									}
								)
							)
					} else if (error.status == 409) {
						return throwError("User not logged")
					} else {
						if (error.error && error.error.message) {
							return throwError(error.error.message)
						} else {
							return throwError("Check your connection")
						}
					}
				})
			)
	}

}

如果看到的话,当您使用MergeMap运算符时,它们只会向您传递答案(一个可观察到的答案),或者至少这是我所看到的.我要说的是,我没有看到他们将它与2个可观察变量一起使用或将2个可观察变量混合使用,这是我在他们的官方文档中所读到的,实际上,在他们显示他们始终使用的示例中它带有2个可观察值.

If you see, when you use the MergeMap operator they only pass you the answer (a single observable), or at least that's what I can see. What I'm trying to say is that I don't see that they are using it with 2 observables or to mix 2 observables, which is what I have read in their official documentation, in fact, in the examples they show they always use it with 2 observables.

老实说,对于我来说很难理解这个运算符,如果有人能够以一种简单的方式帮助我理解它,那么我将不胜感激,除了理解我在前面展示的代码中的用法.提前问候.谢谢!

Honestly it has been too difficult for me to understand this operator, if someone could help me understand it in a simple way, I would be extremely grateful, in addition to understanding its use in that code that I show earlier. Greetings in advance. Thank you!

推荐答案

mergeMap 维护一个或多个内部可观测值

使用外部值提供的功能创建内部可观察的.外部价值本质上只是从其来源收到的价值.例如:

An inner observable is created with the outer value and the provided function. The outer value essentially is just the value received from its source. For example:

of(1, 2, 3).pipe(
  mergeMap((outerValue, index) => /* ... return an observable ... */)
).subscribe(); // `outerValue`: 1, 2, 3 (separately)

输入外部值时,将创建一个新的内部可观察值.我认为最好的理解方法是查看源代码:

When an outer value comes in, a new inner observable will be created. I think the best way to understand this is to have a look at the source code:

// `value` - the `outerValue`
protected _next(value: T): void {
  if (this.active < this.concurrent) {
    this._tryNext(value);
  } else {
    this.buffer.push(value);
  }
}

protected _tryNext(value: T) {
  let result: ObservableInput<R>;
  const index = this.index++;
  try {
    // Create the inner observable based on the `outerValue` and the provided function (`this.project`)
    // `mergeMap(project)`
    result = this.project(value, index);
  } catch (err) {
    this.destination.error(err);
    return;
  }
  this.active++;
  // Subscribe to the inner observable
  this._innerSub(result, value, index);
}

请暂时不要使用 concurrent buffer ,稍后我们将对其进行介绍.

Please disregard for now concurrent and buffer, we'll have a look at them a bit later.

现在,当一个内部可观测对象发出时会发生什么?在进一步介绍之前,值得一提的是,尽管很明显,内部可观察 需要内部订阅者.我们可以从上方的 _innerSub 方法中看到这一点:

Now, what happens when an inner observable emits ? Before going any further, it's worth mentioning that, although it's obvious, an inner observable requires an inner subscriber. We can see this in the _innerSub method from above:

private _innerSub(ish: ObservableInput<R>, value: T, index: number): void {
  const innerSubscriber = new InnerSubscriber(this, value, index);
  const destination = this.destination as Subscription;
  destination.add(innerSubscriber);
  // This is where the subscription takes place
  subscribeToResult<T, R>(this, ish, undefined, undefined, innerSubscriber);
}

当内部观察对象发出时,

When an inner observable emits, the notifyNext method will be called:

notifyNext(outerValue: T, innerValue: R,
            outerIndex: number, innerIndex: number,
            innerSub: InnerSubscriber<T, R>): void {
  this.destination.next(innerValue);
}

目的地指向链中的下一个订户.例如,可以是这样:

Where destination points to the next subscriber in the chain. For example, it can be this:

of(1)
  .pipe(
    mergeMap(/* ... */)
  )
  .subscribe({} /* <- this is the `destination` for `mergeMap` */)

这将在下面的有关链中的下一个订户 中更详细地说明.

This will be explained in more detail in What about the next subscriber in the chain below.

那么,混合2个可观察对象是什么意思?

让我们看看这个例子:

of(2, 3, 1)
  .pipe(
    mergeMap(outerValue => timer(outerValue).pipe(mapTo(outerValue)))
  )
  .subscribe(console.log)
  /* 1 \n 2 \n 3 */

2 到达时, mergeMap 将订阅一个内部可观察对象,该内部可观察对象将以 200 ms发出.这是一个异步操作,但请注意,外部值(2,3,1)是同步到达的.接下来, 3 到达并创建一个内部对象.会在 300 ms内发出.由于当前脚本尚未完成执行,因此尚未考虑回调队列.现在 1 到了,将创建一个内部对象.将在 100 毫秒内发出.

When 2 arrives, mergeMap will subscribe to an inner observable that will emit in 200ms. This is an asynchronous action, but notice that the outer values(2, 3, 1) arrive synchronously. Next, 3 arrives and will create an inner obs. that will emit in 300ms. Since the current script has not finished executing yet, the callback queue is not yet considered. Now 1 arrives, and will create an inner obs. that will emit in 100 ms.

mergeMap 现在具有3个内部可观测对象,并将传递内部可观测对象发出的内部值.
如预期的那样,我们得到 1 2 3 .

mergeMap has now 3 inner observables and will pass along the inner value of whichever inner observable emits.
As expected, we get 1, 2, 3.

这就是 mergeMap 的作用.可观察混合对象可以这样考虑:如果有一个外部值并且已经创建了一个内部可观察对象,那么 mergeMap 只会说:没问题,我会只需创建一个新的内部obs.并订阅".

So that's what mergeMap does. Mixing observables can be thought of this way: if an outer value comes and an inner observable has already been created, then mergeMap simply says: "no problem, I'll just create a new inner obs. and subscribe to it".

mergeMap 提供第二个参数 concurrent ,该参数指示应同时处理多少个内部可观测对象.这些活动内部可观察量的数量是通过 active 属性进行跟踪的.

mergeMap can be given a second argument, concurrent which indicates how many inner observables should handle at the same time. These number of active inner observables is tracked with the active property.

_next 方法所示,如果 active> =并发,则 outerValues 将添加到 buffer ,它是一个队列( FIFO ).

As seen in _next method, if active >= concurrent, the outerValues will be added to a buffer, which is a queue(FIFO).

然后,当一个活动的内部可观察对象完成完成时, mergeMap 将从值中获取最旧的值,并使用提供的函数从中创建一个内部可观察对象:

Then, when one active inner observable completes, mergeMap will take the oldest value from the value and will create an inner observable out of it, using the provided function:

// Called when an inner observable completes
notifyComplete(innerSub: Subscription): void {
  const buffer = this.buffer;
  this.remove(innerSub);
  this.active--;
  if (buffer.length > 0) {
    this._next(buffer.shift()!); // Create a new inner obs. with the oldest buffered value
  } else if (this.active === 0 && this.hasCompleted) {
    this.destination.complete();
  }
}

请记住, concatMap(project)只是 mergeMap(project,1).

所以,如果您有:

of(2, 3, 1)
  .pipe(
    mergeMap(outerValue => timer(outerValue * 100).pipe(mapTo(outerValue)), 1)
  )
  .subscribe(console.log)

这将被记录:

2 \ n 3 \ n 1 .

操作员是功能,可以返回另一个功能,该功能接受可观察作为其唯一参数,然后返回另一个可观察的.订阅流时,运营商返回的每个可观察对象将具有其自己的订阅者.

Operators are functions that return another function which accepts an observable as their only parameter and return another observable. When a stream is being subscribed to, each observable returned by an operator will have its own subscriber.

所有这些订阅者都可以看作是一个链表.例如:

All these subscribers can be seen as a linked list. For example:

// S{n} -> Subscriber `n`, where `n` depends on the order in which the subscribers are created

of(/* ... */)
  .pipe(
    operatorA(), // S{4}
    operatorB(), // S{3}
    operatorC(), // S{2}
  ).subscribe({ /* ... */ }) // S{1}; the observer is converted into a `Subscriber`

S {n} S {n + 1} 的父对象(目标),表示 S {1} S {2} 的目标, S {2} S {3} 的目标,依此类推

S{n} is the parent(destination) of S{n+1}, meaning that S{1} is the destination of S{2}, S{2} is the destination of S{3} and so forth.

StackBlitz

比较这些:

of(2, 1, 0)
  .pipe(
    mergeMap(v => timer(v * 100).pipe(mapTo(v)))
  ).subscribe(console.log)
// 0 1 2

of(2, 1, 0)
  .pipe(
    mergeMap(v => timer(v).pipe(mapTo(v)))
  ).subscribe(console.log)
// 1 0 2

根据 MDN :

指定的时间量(或延迟)不是保证的执行时间,而是最短的执行时间.在主线程上的堆栈为空之前,传递给这些函数的回调将无法运行.

The specified amount of time (or the delay) is not the guaranteed time to execution, but rather the minimum time to execution. The callbacks you pass to these functions cannot run until the stack on the main thread is empty.

因此,像setTimeout(fn,0)这样的代码将在堆栈为空时立即执行,而不是立即执行.如果您执行类似setTimeout(fn,0)之类的代码,但是在运行了从1到100亿的循环之后,您的回调将在几秒钟后执行.

As a consequence, code like setTimeout(fn, 0) will execute as soon as the stack is empty, not immediately. If you execute code like setTimeout(fn, 0) but then immediately after run a loop that counts from 1 to 10 billion, your callback will be executed after a few seconds.

MDN的

本节应还要弄清楚事情.

This section by MDN should clarify things as well.

我会说这是特定于环境的,而不是特定于RxJs的.

I'd say this is environment-specific, rather than RxJs-specific.

在第二个片段中,延迟是连续的,所以这就是为什么您得到意外结果的原因.如果您稍微增加延迟,例如: timer(v * 2),您应该会得到预期的行为.

In the second snippet, the delays are consecutive so that's why you're getting unexpected results. If you increase the delays just a bit, like: timer(v * 2), you should get the expected behavior.

这篇关于¿mergeMap运算符的工作原理是什么?在哪种情况下会使用它?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆