RxJS,如何轮询 API 以使用动态时间戳持续检查更新的记录 [英] RxJS, how to poll an API to continuously check for updated records using a dynamic timestamp

查看:28
本文介绍了RxJS,如何轮询 API 以使用动态时间戳持续检查更新的记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 RxJS 的新手,我正在尝试编写一个可以完成以下任务的应用程序:

I am new to RxJS and I am trying to write an app that will accomplish the following things:

  1. 在加载时,发出 AJAX 请求(为了简单起见,伪装成 fetchItems())以获取项目列表.
  2. 此后每一秒,发出 AJAX 请求以获取项目.
  3. 检查新项目时,仅应返回最近时间戳之后更改的项目.
  4. 在 observable 之外不应该有任何状态.

我的第一次尝试非常直接,实现了目标 1、2 和 4.

My first attempt was very straight forward and met goals 1, 2 and 4.

var data$ = Rx.Observable.interval(1000)
  .startWith('run right away')
  .map(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`, but
    // I am not yet tracking the `modifiedSince` timestamp yet so all items will always be returned
    return fetchItems(); 
  });

现在我很兴奋,这很容易,要实现目标 3 再难了……几个小时后,这是 我现在所处的位置:

Now I'm excited, that was easy, it can't be that much harder to meet goal 3...several hours later this is where I am at:

var modifiedSince = null;
var data$ = Rx.Observable.interval(1000)
  .startWith('run right away')
  .flatMap(function() { 
    // `fetchItems(modifiedSince)` returns an array of items modified after `modifiedSince`
    return fetchItems(modifiedSince);
  })
  .do(function(item) {
    if(item.updatedAt > modifiedSince) {
      modifiedSince = item.updatedAt;
    }
  })
  .scan(function(previous, current) {
    previous.push(current);
    return previous;
  }, []);

这解决了目标 3,但在目标 4 上回归.我现在将状态存储在 observable 之外.

This solves goal 3, but regresses on goal 4. I am now storing state outside of the observable.

我假设全局 modifiedSince.do() 块不是实现此目的的最佳方式.任何指导将不胜感激.

I'm assuming that global modifiedSince and the .do() block aren't the best way of accomplishing this. Any guidance would be greatly appreciated.

希望通过这个问题澄清我正在寻找的内容.

hopefully clarified what I am looking for with this question.

推荐答案

这是另一种不使用闭包或外部状态"的解决方案.

Here is another solution which does not use closure or 'external state'.

我做了以下假设:

  • fetchItems 返回一个 Rx.Observable 项目,即不是项目数组
  • fetchItems returns a Rx.Observable of items, i.e. not an array of items

它利用了 expand 运算符,该运算符允许发出遵循 x_n+1 = f(x_n) 类型的递归关系的值.您可以通过返回一个发出该值的 observable 来传递 x_n+1,例如 Rx.Observable.return(x_n+1) 并且您可以通过返回 来完成递归>Rx.Observable.empty().在这里,您似乎没有结束条件,因此将永远运行.

It makes use of the expand operator which allows to emit values which follow a recursive relationship of the type x_n+1 = f(x_n). You pass x_n+1 by returning an observable which emits that value, for instance Rx.Observable.return(x_n+1) and you can finish the recursion by returning Rx.Observable.empty(). Here it seems that you don't have an ending condition so this will run forever.

scan 还允许根据递归关系发出值 (x_n+1 = f(x_n, y_n)).不同之处在于 scan 强制您使用同步函数(因此 x_n+1y_n 同步),而使用 expand 你可以使用 observable 形式的异步函数.

scan also allows to emit values following a recursive relationship (x_n+1 = f(x_n, y_n)). The difference is that scan forces you to use a syncronous function (so x_n+1 is synchronized with y_n), while with expand you can use an asynchronous function in the form of an observable.

代码没有经过测试,所以如果这行得通,请让我更新.

Code is not tested, so keep me updated if this works or not.

相关文档:expand, combineLatest

var modifiedSinceInitValue = // put your date here
var polling_frequency = // put your value here
var initial_state = {modifiedSince: modifiedSinceInitValue, itemArray : []}
function max(property) {
  return function (acc, current) {
    acc = current[property] > acc ? current[property] : acc;
  }
}    
var data$ = Rx.Observable.return(initial_state)
  .expand (function(state){
             return fetchItem(state.modifiedSince)
                   .toArray()
                   .combineLatest(Rx.Observable.interval(polling_frequency).take(1), 
                     function (itemArray, _) {
                       return {
                         modifiedSince : itemArray.reduce(max('updatedAt'), modifiedSinceInitValue), 
                         itemArray : itemArray
                       }
                     }

  })

这篇关于RxJS,如何轮询 API 以使用动态时间戳持续检查更新的记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆