如何构建一个在前一个 ajax 承诺解决后等待一段时间的 rx 轮询器? [英] How to build an rx poller that waits some interval AFTER the previous ajax promise resolves?

查看:11
本文介绍了如何构建一个在前一个 ajax 承诺解决后等待一段时间的 rx 轮询器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

一直在研究解决此问题的几种方法.基本上,我不想要一个从轮询开始后每 30 秒启动一次 ajax 的轮询器——我想要一个在前一个请求返回后 30 秒启动请求的轮询器.另外,我想围绕失败的指数退避制定一些策略.

Been working on a few approaches to this. Basically, I don't want a poller that kicks off an ajax every 30 seconds from the start of polling -- I want a poller that kicks off requests 30 seconds AFTER the previous request returns. Plus, I want to work in some strategy around exponential back-off for failures.

这是我目前所拥有的(Rx4):

Here's what I have so far (Rx4):

rx.Observable.create(function(observer) {
    var nextPoll = function(obs) {
      // the action function invoked below is what i'm passing in
      // to my poller service as any function which returns a promise
      // and must be invoked each loop due to fromPromise caching
      rx.Observable.fromPromise(action())
        .map(function (x){ return x.data; })
        .subscribe(function(d) {       
            // pass promise up to parent observable  
            observer.onNext(d);

            // reset interval in case previous call was an error
            interval = initInterval;   
            setTimeout(function(){ nextPoll(obs); }, interval);
        }, function(e) {
          // push interval higher (exponential backoff)
          interval = interval < maxInterval ? interval * 2 : maxInterval;
          setTimeout(function(){ nextPoll(obs); }, interval);

        });
    };
    nextPoll(observer);
});

在大多数情况下,这就是我想要的.我不喜欢使用 setTimeout,但我似乎找不到更好的 Observable 方法(除了一次性间隔/计时器与另一个订阅者).

For the most part, this does what I want. I don't like the use of setTimeout, but I can't seem to find a better Observable approach to this (other than a one-off interval/timer with another subscribe).

我无法解决的另一件事是能够控制轮询器在最初启动时是延迟启动还是立即启动.对于某些用途,我将在开始轮询之前获取数据,因此我可以让它在第一次触发之前等待一段时间.到目前为止,我只对发生在第一个 ajax 之前或在 ajax 之间发生的计时器/延迟感到幸运,并将其提供给订阅者,这对我不起作用.

The other thing that I haven't been able to work into this is the ability to control whether the poller, when initially started, can start with a delay or fire immediately. For some uses, I will have just fetched the data prior to starting to poll, so I can let it wait the interval before firing for the first time. So far, I've only had luck with timer/delay that happens before the first ajax or between the ajax and providing it to subscribers, which doesn't work for me.

非常感谢您对清理此内容的任何想法,无论是在一般情况下还是在摆脱 setTimeout 方面.而且,如果有人有办法以可选的延迟启动这个轮询器,那将是非常棒的!谢谢大家!!

Would appreciate any thoughts on cleaning this in, both generally and in terms of getting rid of the setTimeout. And, if anyone has a way to kick off this poller with an optional delay, that would be tremendous! Thanks all!!

更新:终于按照我想象的方式工作了.这是它的样子:

UPDATE: Finally got this working the way I envisioned. Here's what that looks like:

function computeInterval(error) {
  if (error) {
    // double until maximum interval on errors
    interval = interval < maxInterval ? interval * 2 : maxInterval;
  } else {
    // anytime the poller succeeds, make sure we've reset to
    // default interval.. this also allows the initInterval to 
    // change while the poller is running
    interval = initInterval;
  }
  return interval;
}

poller$ = rx.Observable.fromPromise(function(){ return _this.action(); })
  .retryWhen(function(errors){
    return errors.scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval(true));
      });
  })
  .repeatWhen(function(notification){
    return notification
      .scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval());
      });
  });

推荐答案

我使用另一种可能有用和/或可能更简单的技术添加了另一个答案.它基于 repeatWhen 运算符.正如 文档 所解释的那样,它允许你重复订阅一个 observable,直到你发出重复结束的信号,表明正常完成或错误.

I add another answer using another technique which can be useful and/or maybe simpler. It is based on the repeatWhen operator. As the documentation explains, it allows you to repeat subscription to an observable till you signal the end of the repetition indicating normal completion or an error.

var action; // your action function
Rx.Observable.create(function (observer) {
    function executeAction(action) {
        return Rx.Observable.fromPromise(action());
    }

    function computeDelay(){
        // put your exponential delaying logic here
    }

executeAction()
  .repeatWhen(function(notification){
    return Rx.Observable.return({}).delay(computeDelay());
  })
  .subscribe(function(x){
    observer.onNext(x.data);
  })
});

有一篇很棒的文章这里比官方文档更好地解释了 repeatWhenretryWhen (它适用于 RxJava,但它适用于 Rxjs 并进行了一些小的修改).它还给出了您想要实现的确切目标的示例(指数延迟重复).

There is a terrific article here explaining repeatWhen and retryWhen better than the official documentation does (it is for RxJava but it applies to Rxjs with some minor modifications). It also gives an example of exactly what you want to achieve (exponential delayed repeating).

这篇关于如何构建一个在前一个 ajax 承诺解决后等待一段时间的 rx 轮询器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆