使用 rxjs 实现指数退避 [英] Exponential backoff implementation with rxjs

查看:27
本文介绍了使用 rxjs 实现指数退避的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Angular 7 docs 提供了 rxjs Observable 为 AJAX 请求实现指数退避:

Angular 7 docs provide this example of practical usage of rxjs Observables in implementing an exponential backoff for an AJAX request:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

虽然我理解 Observables 和退避的概念,但我不太清楚,retryWhen 将如何计算重新订阅源 ajax 的时间间隔.

While I understand the concept of both Observables and backoff, I can’t quite figure out, how exactly retryWhen will calculate time intervals for resubscribing to the source ajax.

具体如何zipmapmapMerge 在这个设置中工作吗?

Specifically, how do zip, map, and mapMerge work in this setup?

attempts 对象被发送到 retryWhen 时,它会包含什么?

And what’s going to be contained in the attempts object when it’s emitted into retryWhen?

我浏览了他们的参考页面,但仍然无法理解这一点.

I went through their reference pages, but still can’t wrap my head around this.

推荐答案

我花了相当多的时间来研究这个(为了学习目的),并将尝试尽可能彻底地解释这段代码的工作原理.

I have spent quite some time researching this (for learning purposes) and will try to explain the workings of this code as thoroughly as possible.

首先,这是原始代码,注释:

First, here’s the original code, annotated:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {                  // (1)
 return pipe(                                     // (2)
   retryWhen(attempts => range(1, maxTries)       // (3)
     .pipe(
       zip(attempts, (i) => i),                   // (4)
       map(i => i * i),                           // (5)
       mergeMap(i =>  timer(i * ms))              // (6)
     )
   )
 );                                               // (7)
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

  1. 很简单,我们正在使用 retryWhen 运算符创建自定义 backoff 运算符.我们稍后可以在 pipe 函数中应用它.
  2. 在这种情况下,pipe 方法返回一个自定义运算符.
  3. 我们的自定义运算符将是修改后的 retryWhen 运算符.它需要一个函数参数.这个函数将被调用一次——特别是当这个 retryWhen 被第一次遇到/调用时.顺便说一下,retryWhen在源 observable 产生错误时起作用.然后它可以防止错误进一步传播并重新订阅源.如果源产生一个非错误结果(无论是在第一次订阅还是在重试时),retryWhen 就会被忽略并且不参与.

  1. Easy enough, we’re creating custom backoff operator out of retryWhen operator. We’ll be able to apply this later within pipe function.
  2. In this context, pipe method returns a custom operator.
  3. Our custom operator is going to be a modified retryWhen operator. It takes a function argument. This function is going to be called once — specifically, when this retryWhen is first encountered/invoked. By the way, retryWhen gets into play only when the source observable produces an error. It then prevents error from propagating further and resubscribes to the source. If the source produces a non-error result (whether on first subscription or on a retry), retryWhen is passed over and is not involved.

关于尝试的几句话.这是一个可观察的.它不是可观察的源.它是专门为 retryWhen 创建的.它只有一种用途和一种用途:每当订阅(或重新订阅)源 observable 导致错误时,attempts 会触发 next.我们有 attempts 并且可以自由使用它,以便以某种方式对每个失败的订阅源 observable 尝试做出反应.

A few words on attempts. It’s an observable. It is not the source observable. It is created specifically for retryWhen. It has one use and one use only: whenever subscription (or re-subscription) to the source observable results in an error, attempts fires a next. We are given attempts and are free to use it in order to react in some way to each failed subscription attempt to the source observable.

这就是我们要做的.

首先,我们创建 range(1, maxTries),一个 observable,对于我们愿意执行的每次重试都有一个整数.range 已准备好立即触发它的所有数字,但我们必须控制住它:我们只需要在发生另一次重试时提供一个新数字.所以,这就是为什么我们...

First we create range(1, maxTries), an observable that has an integer for every retry we are willing to perform. range is ready to fire all it’s numbers right then and there, but we have to hold its horses: we only need a new number when another retry happens. So, that’s why we...

... 用 attempts 压缩它.意思是,将 attempts 的每个发出值与 range 的单个值结合起来.

... zip it with the attempts. Meaning, marry each emitted value of attempts with a single value of range.

请记住,我们当前所在的函数只会被调用一次,届时,attempts 只会触发一次 next — 对于初始失败订阅.所以,在这一点上,我们的两个压缩 observable 只产生了一个值.

Remember, function we’re currently in is going to be called only once, and at that time, attempts will have only fired next once — for the initial failed subscription. So, at this point, our two zipped observables have produced just one value.

顺便说一句,将两个 observables 压缩为一个的值是什么?这个函数决定:(i) =>我.为了清楚起见,它可以写成 (itemFromRange, itemFromAttempts) =>itemFromRange.第二个参数没有使用,所以去掉了,第一个重命名为 i.

Btw, what are the values of the two observables zipped into one? This function decides that: (i) => i. For clarity it can be written (itemFromRange, itemFromAttempts) => itemFromRange. Second argument is not used, so it’s dropped, and first is renamed into i.

这里发生的事情是,我们只是忽略了由 attempts 触发的 values,我们只对它们被触发的 事实 感兴趣.每当发生这种情况时,我们都会从 range observable 中提取下一个值...

What happens here, is we simply disregard the values fired by attempts, we are only interested in the fact that they are fired. And whenever that happens we pull the next value from range observable...

...并平方.这是指数退避的指数部分.

...and square it. This is for the exponential part of the exponential backoff.

因此,现在每当(重新)订阅源失败时,我们手上的整数就会不断增加(1、4、9、16...).我们如何将该整数转换为下一次重新订阅之前的时间延迟?

So, now whenever (re-)subscription to source fails, we have an ever increasing integer on our hands (1, 4, 9, 16...). How do we transform that integer into a time delay until next re-subscription?

请记住,我们当前所在的这个函数必须返回一个可观察对象,使用 attempts 作为输入.这个结果 observable 只构建一次.retryWhen 然后订阅结果 observable 并且: 每当结果 observable 触发 next 时,重试订阅源 observable;每当结果 observable 触发相应的事件时,就会对源 observable 调用 completeerror.

Remember, this function we are currently inside of, it must return an observable, using attempts as input. This resulting observable is only built once. retryWhen then subscribes to that resulting observable and: retries subscribing to source observable whenever resulting observable fires next; calls complete or error on source observable whenever resulting observable fires those corresponding events.

长话短说,我们需要让 retryWhen 稍等一下.延迟 可能会使用运算符,但设置延迟的指数增长可能会很痛苦.相反,mergeMap 操作符开始发挥作用.

Long story short, we need to make retryWhen wait a bit. delay operator could maybe be used, but setting up exponential growth of the delay would likely be pain. Instead, mergeMap operator comes into play.

mergeMap 是两个运算符组合的快捷方式:mapmergeAll.map 简单地将每个递增的整数 (1, 4, 9, 16...) 转换为一个 timer observable,它在传递的数量之后触发 next毫秒.mergeAll 强制 retryWhen 实际订阅 timer.如果最后一点没有发生,我们生成的 observable 将立即使用 timer observable 实例作为值触发 next.

mergeMap is a shortcut for two operators combined: map and mergeAll. map simply converts every increasing integer (1, 4, 9, 16...) into a timer observable which fires next after passed number of milliseconds. mergeAll forces retryWhen to actually subscribe to timer. If that last bit didn’t happen, our resulting observable would just fire next immediately with timer observable instance as value.

此时,我们已经构建了我们的自定义 observable,retryWhen 将使用它来决定何时尝试重新订阅源 observable.

At this point, we’ve built our custom observable which will be used by retryWhen to decide when exactly to attempt to re-subscribe to source observable.

就目前而言,我认为此实现存在两个问题:

As it stands I see two problems with this implementation:

  • 一旦我们生成的 observable 触发它的最后一个 next(导致最后一次尝试重新订阅),它也会立即触发 complete.除非源 observable 非常 返回结果(假设最后一次重试将是成功的),否则该结果将被忽略.

  • As soon as our resulting observable fires its last next (causing the last attempt to resubscribe), it also immediately fires complete. Unless the source observable returns result very quickly (assuming that the very last retry will be the one that succeeds), that result is going to be ignored.

这是因为一旦 retryWhen 从我们的 observable 中听到 complete ,它就会在源代码上调用 complete ,它可能仍在进程中发出 AJAX 请求.

This is because as soon as retryWhen hears complete from our observable, it calls complete on source, which may still be in the process of making AJAX request.

如果所有重试都不成功,源实际上会调用complete,而不是更合乎逻辑的error.

If all retries were unsuccessful, source actually calls complete instead of more logical error.

为了解决这两个问题,我认为我们得到的 observable 应该在最后触发 error,在给最后一次重试一些合理的时间来尝试完成它的工作之后.

To solve both these issues, I think that our resulting observable should fire error at the very end, after giving the last retry some reasonable time to attempt to do its job.

这是我对上述修复的实现,它还考虑了最新 rxjs v6zip 运算符的弃用:

Here’s my implementation of said fix, which also takes into account deprecation of zip operator in latest rxjs v6:

import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";

function backoffImproved(maxTries, ms) {
    return pipe(
        retryWhen(attempts => {
            const observableForRetries =
                zip(range(1, maxTries), attempts)
                    .pipe(
                        map(([elemFromRange, elemFromAttempts]) => elemFromRange),
                        map(i => i * i),
                        switchMap(i => timer(i * ms))
                    );
            const observableForFailure =
                throwError(new Error('Could not complete AJAX request'))
                    .pipe(
                        materialize(),
                        delay(1000),
                        dematerialize()
                    );
            return concat(observableForRetries, observableForFailure);
        })
    );
}

我测试了这段代码,它似乎在所有情况下都能正常工作.我现在懒得详细解释;我怀疑有人会阅读上面的文字墙.

I tested this code and it seems to work properly in all cases. I can’t be bothered to explain it in detail right now; I doubt anyone will even read the wall of text above.

无论如何,非常感谢@BenjaminGruenbaum 和@cartant 让我走上正确的道路,让我的头脑围绕这一切.

Anyway, big thanks to @BenjaminGruenbaum and @cartant for setting me onto right path for wrapping my head around all this.

这篇关于使用 rxjs 实现指数退避的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆