rxjs对具有可变响应时间的端点进行定期轮询 [英] rxjs periodic polling of an endpoint with a variable response time

查看:94
本文介绍了rxjs对具有可变响应时间的端点进行定期轮询的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想对端点进行轮询的速度不超过每秒一次,而又不低于对端点进行轮询所花费的时间.未决的请求不应超过一个.

I want to poll an endpoint no faster than once a second, and no slower than the time it takes to poll the endpoint. There should never be more than one request outstanding.

我想要一种反应式编程方式,至少每秒轮询一次端点,但是如果该端点花费的时间超过1秒,那么下一个请求将立即触发.

I want a reactive programming way to poll an endpoint at least once a second, but if the endpoint takes longer than 1 second, the next request fires immediately.

在下面的大理石图中,第2个和第3个请求花费的时间超过1秒,但是第4个和第5个请求的完成速度更快.下一个请求将在1秒边界上触发,或者在从上一个未完成的请求中获取数据后立即触发.

In the marble diagram below, the 2nd and 3rd requests take longer than 1 second, but the 4th and 5th requests finish quicker. The next request fires either on the 1 second boundary, or immediately upon obtaining the data from the last outstanding request.

s---s---s---s---s---s---| # 1 second interval observable
r---r----r--------r-r---| # endpoint begin polling events
-d-------d--------dd-d--| # endpoint data response events

我正在努力使大理石图中的术语正确,所以我 假设端点请求的开始应为 我标记为"r"的大理石事件,而我标记为"d"的大理石事件具有终点数据.

I'm trying to get the terminology correct in the marble diagram, so I'm assuming that the beginning of the endpoint requests should be the marble I label "r", and the marble event I label "d" has the endpoint data.

这是我用普通js完成此操作所需的代码量;但是,后续请求不会像我上面所问的那样立即触发.

Here's how much code it took me to do this in plain js; however, the subsequent requests do not fire immediately upon being obtained as I have asked above.

    var poll;
    var previousData;
    var isPolling = false;
    var dashboardUrl = 'gui/metrics/dashboard';
    var intervalMs = updateServiceConfig.getIntervalInMilliSecondForCharts();

    return {
        startInterval: startInterval,
        stopInterval: stopInterval
    };

    function startInterval() {
        stopInterval();
        tryPolling(); // immediately hit the dashboard
        // attempt polling at the interval
        poll = $interval(tryPolling, intervalMs);
    }

    /**
     * attempt polling as long as there is no in-flight request
     * once the in-flight request completes or fails, allow the next request to be processed
     */
    function tryPolling() {
        if (!isPolling) {
            isPolling = true;

            getDashboard()
            // if the dashboard either returns successful or fails, reset the polling boolean
                .then(resetPolling, resetPolling);
        }
    }

    /** there's no longer an in-flight request, so reset the polling boolean */
    function resetPolling() {
        isPolling = false;
    }

    function stopInterval() {
        if (poll) {
            $interval.cancel(poll);
            poll = undefined;
        }
    }

    function getDashboard() {
        return restfulService.get(dashboardUrl)
            .then(updateDashboard);
    }

    function updateDashboard(data) {
        if (!utils.deepEqual(data, previousData)) {
            previousData = angular.copy(data);
            $rootScope.$broadcast('$dashboardLoaded', data);
        }
    }

推荐答案

这是我的解决方案.它使用内部主题combineLatestfilter来确保如果响应比timer周期慢,则请求不会累积.

Here is my solution. It uses an internal subject, combineLatest and filter to ensure that requests don't accumulate if the responses are slower to arrive than the timer period.

评论应说明其工作原理.

The comments should explain how it works.

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
    return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

function poll() {

  return Rx.Observable.defer(() => {

    // Use defer so that the internal subject is created for each
    // subscription.
    const subject = new Rx.BehaviorSubject({ tick: -1, pending: false });

    return Rx.Observable
    
      // Combine the timer and the subject's state.
      .combineLatest(
        Rx.Observable.timer(0, 1000).do(tick => console.log("tick", tick)),
        subject
      )

      // Filter out combinations in which either a more recent tick
      // has not occurred or a request is pending.
      .filter(([tick, state]) => (tick !== state.tick) && !state.pending)

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: true }))
      
      // Make the request and use the result selector to combine
      // the tick and the response.
      .mergeMap(([tick]) => mock(), ([tick], resp) => [tick, resp])

      // Update the subject's state.
      .do(([tick]) => subject.next({ tick, pending: false }))
      
      // Map the response.
      .map(([tick, resp]) => resp);
  });
}

poll().take(delays.length).subscribe(r => console.log(r));

.as-console-wrapper { max-height: 100% !important; top: 0; }

<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

我只是想到有一个运算符可以做到这一点:exhaustMap.

It's just occurred to me that there is an operator that does exactly this: exhaustMap.

const delays = [100, 2000, 100, 3000];
const since = Date.now();
let index = 0;

function mock() {
  return Rx.Observable
    .of("res")
    .do(() => console.log("mock req at ", Date.now() - since, " ms"))
    .delay(delays[index++ % delays.length])
    .do(() => console.log("mock res at ", Date.now() - since, " ms"));
}

const poll = Rx.Observable
  .timer(0, 1000)
  .do(tick => console.log("tick", tick))
  .exhaustMap(() => mock());

poll.take(delays.length).subscribe(r => console.log(r));

.as-console-wrapper { max-height: 100% !important; top: 0; }

<script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

这篇关于rxjs对具有可变响应时间的端点进行定期轮询的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆