如何在RxJS中实现时间到期热观察(或在Reactive Extensions中一般) [英] How to implement time expiry hot observable in RxJS (or general in Reactive Extensions)

查看:61
本文介绍了如何在RxJS中实现时间到期热观察(或在Reactive Extensions中一般)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想用RxJs实现Time Expiry缓存。以下是普通缓存的示例:

I'd like to implement Time Expiry cache with RxJs. Here is example of "normal" cache:

//let this represents "heavy duty job"
var data = Rx.Observable.return(Math.random() * 1000).delay(2000);

//and we want to cache result
var cachedData = new Rx.AsyncSubject();
data.subscribe(cachedData);

cachedData.subscribe(function(data){
    //after 2 seconds, result is here and data is cached
    //next subscribe returns immediately data
    cachedData.subscribe(function(data2){ /*this is "instant"*/ });
});

订阅 cachedData ,调用重载作业,2秒后结果保存在 cachedData AsyncSubject )。 cachedData 上的任何其他后续订阅会立即返回已保存的结果(因此缓存实施)。

When subscribe on cachedData is called for the first time, "heavy duty job" is called, and after 2 seconds result is saved in cachedData (AsyncSubject). Any other subsequent subscribe on cachedData returns immediately with saved result (thus cache implementation).

我想要达到的目的是在 cachedData 有效的时间段内加快这个问题,当那段时间过去了,我想重新为新数据运行重型工作,并在新的时间段内再次缓存这个......等等。

What I'd like to achieve is to "spice" this up with time period within cachedData is valid, and when that time passes, I'd like to re-run "heavy duty job" for new data and cache this again for new time period, etc...

期望的行为:

//pseudo code
cachedData.youShouldExpireInXSeconds(10);


//let's assume that all code is sequential from here

//this is 1.st run
cachedData.subscribe(function (data) {
    //this first subscription actually runs "heavy duty job", and
    //after 2 seconds first result data is here
});

//this is 2.nd run, just after 1.st run finished
cachedData.subscribe(function (data) {
    //this result is cached
});

//15 seconds later
// cacheData should expired
cachedData.subscribe(function (data) {
    //i'm expecting same behaviour as it was 1.st run:
    // - this runs new "heavy duty job"
    // - and after 2 seconds we got new data result
});


//....
//etc

我是Rx(Js)的新手,并且无法弄清楚如何用冷却来实现这个热的observable。

I'm new to Rx(Js) and cannot figure out how to implement this hot observable with cooldown.

推荐答案

您所缺少的是在一段时间后安排任务用新的 AsyncSubject 替换 cachedData 。以下是如何将其作为新的 Rx.Observable 方法:

All you are missing is to schedule a task to replace your cachedData with a new AsyncSubject after a time period. Here's how to do it as a new Rx.Observable method:

Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
    var source = this,
        cachedData = undefined;

    // Use timeout scheduler if scheduler not supplied
    scheduler = scheduler || Rx.Scheduler.timeout;

    return Rx.Observable.create(function (observer) {

        if (!cachedData) {
            // The data is not cached.
            // create a subject to hold the result
            cachedData = new Rx.AsyncSubject();

            // subscribe to the query
            source.subscribe(cachedData);

            // when the query completes, start a timer which will expire the cache
            cachedData.subscribe(function () {
                scheduler.scheduleWithRelative(expirationMs, function () {
                    // clear the cache
                    cachedData = undefined;
                });
            });
        }

        // subscribe the observer to the cached data
        return cachedData.subscribe(observer);
    });
};

用法:

// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);

// the cached query
var cachedData = data.cacheWithExpiration(15000);

// first observer must wait
cachedData.subscribe();

// wait 3 seconds

// second observer gets result instantly
cachedData.subscribe();

// wait 15 seconds

// observer must wait again
cachedData.subscribe();

这篇关于如何在RxJS中实现时间到期热观察(或在Reactive Extensions中一般)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆