使用rxjs实现指数补偿 [英] Exponential backoff implementation with rxjs

查看:86
本文介绍了使用rxjs实现指数补偿的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Angular 7 文档提供了rxjs Observable s为AJAX请求实现指数补偿:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

虽然我了解Observables和backoff的概念,但我还不太清楚retryWhen如何精确计算重新订阅源ajax的时间间隔.

具体来说, zip 解决方案

(出于学习目的)我花了很多时间对此进行研究,并将尝试尽可能全面地解释此代码的工作原理.

首先,这是原始代码,带注释:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {                  // (1)
 return pipe(                                     // (2)
   retryWhen(attempts => range(1, maxTries)       // (3)
     .pipe(
       zip(attempts, (i) => i),                   // (4)
       map(i => i * i),                           // (5)
       mergeMap(i =>  timer(i * ms))              // (6)
     )
   )
 );                                               // (7)
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

  1. 很容易,我们从retryWhen运算符中创建了自定义backoff运算符.我们稍后将可以在pipe函数中应用此功能.
  2. 在这种情况下,pipe方法返回一个自定义运算符.
  3. 我们的自定义运算符将是修改后的retryWhen运算符.它带有一个函数参数.该函数将被称为一次-特别是在第一次遇到/调用此retryWhen时.顺便说一句,当可观察到的源产生错误时,retryWhen仅在 中起作用.然后,它可以防止错误进一步传播并重新订阅源.如果源产生非错误结果(无论是第一次订阅还是重试),都会跳过retryWhen且不涉及.

    attempts上的几句话.这是可以观察到的.这不是可观察到的源.它是专门为retryWhen创建的.它只有一种用途:只有可观察到的对源可观察项的订阅(或重新订阅)导致错误时,attempts会触发next.我们给了attempts并可以自由使用它,以便以某种方式对每次可观察到的可观察到的源订阅失败做出反应.

    这就是我们要做的.

    首先,我们创建range(1, maxTries),这是一个可观察的对象,对于我们愿意执行的每次重试,它都有一个整数. range准备随时随地发射所有数字,但我们必须坚守己::只有在发生另一次重试时,我们才需要一个新数字.因此,这就是我们...

  4. ...用attempts压缩.意思是,将每个attempts的发射值与单个值range结合起来.

    请记住,我们当前正在使用的功能将仅被调用一次,届时,attempts将仅触发一次next —对于最初的失败订阅.因此,在这一点上,我们的两个压缩观测值只产生了一个值.

    顺便说一句,将两个可观察值压缩成一个的值是多少?该函数确定:(i) => i.为了清楚起见,可以将其写为(itemFromRange, itemFromAttempts) => itemFromRange.第二个参数未使用,因此将其删除,第一个参数重命名为i.

    这里发生的是,我们只是无视attempts触发的,我们只对触发它们的事实感兴趣.每当发生这种情况时,我们都会从range可观察的值中拉出下一个值...

  5. ...并将其平方.这是用于指数补偿的指数部分.

    因此,现在无论何时(重新)订阅源失败,我们手上的整数都会不断增加(1、4、9、16 ...).我们如何将整数转换为时间延迟直到下一次重新订阅?

    请记住,我们当前使用的此函数,必须使用attempts作为输入返回一个可观察的对象.此结果可观察到的对象仅构建一次. retryWhen然后订阅该可观察结果,并且:每当发生可观察的火灾时,重试订阅可观察的源.每当可观察到的结果触发那些相应事件时,在可观察到的源上调用completeerror.

  6. 长话短说,我们需要稍等一下.
    delay 运算符可能是使用,但设置延迟的指数增长可能会很痛苦.取而代之的是mergeMap运算符.

    mergeMap是将两个运算符组合在一起的快捷方式:mapmergeAll. map只是将每个递增的整数(1、4、9、16 ...)转换为一个timer可观察值,该值在经过毫秒数后将触发next. mergeAll强制retryWhen实际订阅timer.如果最后一点没有发生,我们得到的可观察对象将立即使用timer可观察实例作为值触发next.

  7. 至此,我们已经构建了自定义的Observable,retryWhen将使用它来决定何时确切尝试重新订阅源Observable.

就目前而言,我发现此实现存在两个问题:

  • 一旦我们产生的可观察结果触发了最后一个next(导致了最后一次尝试重新订阅),它也会立即触发complete.除非源可观察到的返回结果很快非常(假设最后一次重试将是成功的),否则该结果将被忽略.

    这是因为,一旦retryWhen从我们的观察对象中听到complete,它就会在源上调用complete,这可能仍在发出AJAX请求的过程中.

  • 如果所有重试均未成功,则源实际上会调用complete而不是更具逻辑性的error.

要解决这两个问题,我认为在给最后一次重试提供一定的合理时间以尝试完成其工作后,最终得到的可观察值应在最后触发error.

这是我对上述修复程序的实施,其中还考虑了对zip运算符在最新的rxjs v6中的弃用:

import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";

function backoffImproved(maxTries, ms) {
    return pipe(
        retryWhen(attempts => {
            const observableForRetries =
                zip(range(1, maxTries), attempts)
                    .pipe(
                        map(([elemFromRange, elemFromAttempts]) => elemFromRange),
                        map(i => i * i),
                        switchMap(i => timer(i * ms))
                    );
            const observableForFailure =
                throwError(new Error('Could not complete AJAX request'))
                    .pipe(
                        materialize(),
                        delay(1000),
                        dematerialize()
                    );
            return concat(observableForRetries, observableForFailure);
        })
    );
}

我测试了此代码,它似乎在所有情况下都能正常工作.我现在不愿意详细解释它;我怀疑有人还会读上面的文字墙.

无论如何,非常感谢@BenjaminGruenbaum和@cartant使我走上正确的道路,将我的脑袋缠在这一切上.

Angular 7 docs provide this example of practical usage of rxjs Observables in implementing an exponential backoff for an AJAX request:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {
 return pipe(
   retryWhen(attempts => range(1, maxTries)
     .pipe(
       zip(attempts, (i) => i),
       map(i => i * i),
       mergeMap(i =>  timer(i * ms))
     )
   )
 );
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

While I understand the concept of both Observables and backoff, I can’t quite figure out, how exactly retryWhen will calculate time intervals for resubscribing to the source ajax.

Specifically, how do zip, map, and mapMerge work in this setup?

And what’s going to be contained in the attempts object when it’s emitted into retryWhen?

I went through their reference pages, but still can’t wrap my head around this.

解决方案

I have spent quite some time researching this (for learning purposes) and will try to explain the workings of this code as thoroughly as possible.

First, here’s the original code, annotated:

import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';

function backoff(maxTries, ms) {                  // (1)
 return pipe(                                     // (2)
   retryWhen(attempts => range(1, maxTries)       // (3)
     .pipe(
       zip(attempts, (i) => i),                   // (4)
       map(i => i * i),                           // (5)
       mergeMap(i =>  timer(i * ms))              // (6)
     )
   )
 );                                               // (7)
}

ajax('/api/endpoint')
  .pipe(backoff(3, 250))
  .subscribe(data => handleData(data));

function handleData(data) {
  // ...
}

  1. Easy enough, we’re creating custom backoff operator out of retryWhen operator. We’ll be able to apply this later within pipe function.
  2. In this context, pipe method returns a custom operator.
  3. Our custom operator is going to be a modified retryWhen operator. It takes a function argument. This function is going to be called once — specifically, when this retryWhen is first encountered/invoked. By the way, retryWhen gets into play only when the source observable produces an error. It then prevents error from propagating further and resubscribes to the source. If the source produces a non-error result (whether on first subscription or on a retry), retryWhen is passed over and is not involved.

    A few words on attempts. It’s an observable. It is not the source observable. It is created specifically for retryWhen. It has one use and one use only: whenever subscription (or re-subscription) to the source observable results in an error, attempts fires a next. We are given attempts and are free to use it in order to react in some way to each failed subscription attempt to the source observable.

    So that’s what we are going to do.

    First we create range(1, maxTries), an observable that has an integer for every retry we are willing to perform. range is ready to fire all it’s numbers right then and there, but we have to hold its horses: we only need a new number when another retry happens. So, that’s why we...

  4. ... zip it with the attempts. Meaning, marry each emitted value of attempts with a single value of range.

    Remember, function we’re currently in is going to be called only once, and at that time, attempts will have only fired next once — for the initial failed subscription. So, at this point, our two zipped observables have produced just one value.

    Btw, what are the values of the two observables zipped into one? This function decides that: (i) => i. For clarity it can be written (itemFromRange, itemFromAttempts) => itemFromRange. Second argument is not used, so it’s dropped, and first is renamed into i.

    What happens here, is we simply disregard the values fired by attempts, we are only interested in the fact that they are fired. And whenever that happens we pull the next value from range observable...

  5. ...and square it. This is for the exponential part of the exponential backoff.

    So, now whenever (re-)subscription to source fails, we have an ever increasing integer on our hands (1, 4, 9, 16...). How do we transform that integer into a time delay until next re-subscription?

    Remember, this function we are currently inside of, it must return an observable, using attempts as input. This resulting observable is only built once. retryWhen then subscribes to that resulting observable and: retries subscribing to source observable whenever resulting observable fires next; calls complete or error on source observable whenever resulting observable fires those corresponding events.

  6. Long story short, we need to make retryWhen wait a bit. delay operator could maybe be used, but setting up exponential growth of the delay would likely be pain. Instead, mergeMap operator comes into play.

    mergeMap is a shortcut for two operators combined: map and mergeAll. map simply converts every increasing integer (1, 4, 9, 16...) into a timer observable which fires next after passed number of milliseconds. mergeAll forces retryWhen to actually subscribe to timer. If that last bit didn’t happen, our resulting observable would just fire next immediately with timer observable instance as value.

  7. At this point, we’ve built our custom observable which will be used by retryWhen to decide when exactly to attempt to re-subscribe to source observable.

As it stands I see two problems with this implementation:

  • As soon as our resulting observable fires its last next (causing the last attempt to resubscribe), it also immediately fires complete. Unless the source observable returns result very quickly (assuming that the very last retry will be the one that succeeds), that result is going to be ignored.

    This is because as soon as retryWhen hears complete from our observable, it calls complete on source, which may still be in the process of making AJAX request.

  • If all retries were unsuccessful, source actually calls complete instead of more logical error.

To solve both these issues, I think that our resulting observable should fire error at the very end, after giving the last retry some reasonable time to attempt to do its job.

Here’s my implementation of said fix, which also takes into account deprecation of zip operator in latest rxjs v6:

import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";

function backoffImproved(maxTries, ms) {
    return pipe(
        retryWhen(attempts => {
            const observableForRetries =
                zip(range(1, maxTries), attempts)
                    .pipe(
                        map(([elemFromRange, elemFromAttempts]) => elemFromRange),
                        map(i => i * i),
                        switchMap(i => timer(i * ms))
                    );
            const observableForFailure =
                throwError(new Error('Could not complete AJAX request'))
                    .pipe(
                        materialize(),
                        delay(1000),
                        dematerialize()
                    );
            return concat(observableForRetries, observableForFailure);
        })
    );
}

I tested this code and it seems to work properly in all cases. I can’t be bothered to explain it in detail right now; I doubt anyone will even read the wall of text above.

Anyway, big thanks to @BenjaminGruenbaum and @cartant for setting me onto right path for wrapping my head around all this.

这篇关于使用rxjs实现指数补偿的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆