RxJs:轮询直到间隔完成或收到正确的数据 [英] RxJs: poll until interval done or correct data received

查看:97
本文介绍了RxJs:轮询直到间隔完成或收到正确的数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我如何使用 RxJs 在浏览器中执行以下场景:

How do i execute the following scenario in the browser with RxJs:

  • 提交数据到队列进行处理
  • 取回作业 ID
  • 每 1 秒轮询另一个端点,直到结果可用或 60 秒过去(然后失败)

我提出的中间解决方案:

Intermediate solution that i've come up with:

 Rx.Observable
    .fromPromise(submitJobToQueue(jobData))
    .flatMap(jobQueueData => 
      Rx.Observable
            .interval(1000)
            .delay(5000)
            .map(_ => jobQueueData.jobId)
            .take(55)
    )
    .flatMap(jobId => Rx.Observable.fromPromise(pollQueueForResult(jobId)))
    .filter(result => result.completed)
    .subscribe(
      result => console.log('Result', result),
      error =>  console.log('Error', error)
    );

  1. 有没有办法在没有中间变量的情况下在数据到达或发生错误时停止计时器?我现在可以引入新的 observable 然后使用 takeUntil
  2. flatMap 在这里的用法在语义上是否正确?也许这整个事情应该重写而不是与 flatMap 链接?
  1. Is there a way without intermediate variables to stop the timer once the data arrives or error occurs? I now i could introduce new observable and then use takeUntil
  2. Is flatMap usage here semantically correct? Maybe this whole thing should be rewritten and not chained with flatMap ?

推荐答案

从顶部开始,您已承诺将变成可观察对象.一旦这产生一个值,您希望每秒调用一次,直到您收到某个响应(成功)或直到某个时间段过去.我们可以将这个解释的每一部分映射到一个 Rx 方法:

Starting from the top, you've got a promise that you turn into an observable. Once this yields a value, you want make a call once per second until you receive a certain response (success) or until a certain amount of time has passed. We can map each part of this explanation to an Rx method:

一旦产生一个值" = map/flatMap (在这种情况下是 flatMap 因为接下来的也是可观察的,并且我们需要把它们弄平)

"Once this yields a value" = map/flatMap (flatMap in this case because what comes next will also be observables, and we need to flatten them out)

每秒一次" = 间隔

收到某个响应" = filter

"or" = amb

已经过了一定的时间" = timer

"certain amount of time has passed" = timer

从那里,我们可以像这样拼凑起来:

From there, we can piece it together like so:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .amb(
        Rx.Observable.timer(60000)
          .flatMap(() => Rx.Observable.throw(new Error('Timeout')))
      )
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

一旦我们得到初步结果,我们就将其投射到两个 observable 之间的竞争中,一个在收到成功响应时产生一个值,一个在经过一定时间后产生一个值.第二个 flatMap 是因为 .throw 不存在于 observable 实例中,并且 Rx.Observable 上的方法返回一个也需要的 observable被夷为平地.

Once we've got our initial result, we project that into a race between two observables, one that will yield a value when it receives a successful response, and one that will yield a value when a certain amount of time has passed. The second flatMap there is because .throw isn't present on observable instances, and the method on Rx.Observable returns an observable which also needs to be flattened out.

事实证明 amb/timer 组合实际上可以被 timeout 替换,如下所示:

It turns out that the amb / timer combo can actually be replaced by timeout, like so:

Rx.Observable
  .fromPromise(submitJobToQueue(jobData))
  .flatMap(jobQueueData =>
    Rx.Observable.interval(1000)
      .flatMap(() => pollQueueForResult(jobQueueData.jobId))
      .filter(x => x.completed)
      .take(1)
      .map(() => 'Completed')
      .timeout(60000, Rx.Observable.throw(new Error('Timeout')))
  )
  .subscribe(
    x => console.log('Result', x),
    x => console.log('Error', x)
  )
;

我省略了示例中的 .delay,因为它没有在您想要的逻辑中描述,但它可以很容易地适应这个解决方案.

I omitted the .delay you had in your sample as it wasn't described in your desired logic, but it could be fitted trivially to this solution.

所以,直接回答你的问题:

So, to directly answer your questions:

  1. 在上面的代码中,不需要手动停止任何事情,因为 interval 将在订阅者计数下降到零的那一刻被处理掉,这将发生在 take(1)amb/timeout 完成.
  2. 是的,您原来的两种用法都是有效的,因为在这两种情况下,您都将 observable 的每个元素投影到一个新的 observable 中,并希望将 observable 的结果 observable 展平为常规 observable.
  1. In the code above there is no need to manually stop anything, as the interval will be disposed of the moment the subscriber count drops to zero, which will occur either when the take(1) or amb / timeout completes.
  2. Yes, both usages in your original were valid, as in both cases you were projecting each element of an observable into a new observable, and wanting to flatten the resultant observable of observables out into a regular observable.

这是 jsbin 我拼凑起来测试解决方案(您可以调整值在 pollQueueForResult 中返回以获得所需的成功/超时;为了快速测试,时间已除以 10).

Here's the jsbin I threw together to test the solution (you can tweak the value returned in pollQueueForResult to obtain the desired success/timeout; times have been divided by 10 for the sake of quick testing).

这篇关于RxJs:轮询直到间隔完成或收到正确的数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆