RxJava缓存数据 [英] RxJava and Cached Data

查看:142
本文介绍了RxJava缓存数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我还是相当新的RxJava,我用它在Android应用程序。我读过一吨的主题,但仍感觉我失去了一些东西。

I'm still fairly new to RxJava and I'm using it in an Android application. I've read a metric ton on the subject but still feel like I'm missing something.

我有以下情形:

我有存储在其中通过不同的服务连接(AIDL)访问系统数据,我需要从这个系统(的异步调用1-n个可能发生)检索数据。接收帮助我一吨简化这个code。然而,该整个过程往往需要几秒钟(向上的5秒+)因此,我需要缓存该数据,以加快本地应用。

I have data stored in the system which is accessed via various service connections (AIDL) and I need to retrieve data from this system (1-n number of async calls can happen). Rx has helped me a ton in simplifying this code. However, this entire process tends to take a few seconds (upwards of 5 seconds+) therefore I need to cache this data to speed up the native app.

在这一点上的要求是:

  1. 初始订阅,缓存将是空的,所以我们必须要等待所需的时间来加载。没什么大不了。在这以后,数据应被高速缓存。

  1. Initial subscription, the cache will be empty, therefore we have to wait the required time to load. No big deal. After that the data should be cached.

随后载荷应该从缓存中提取数据,但该数据应重新加载和磁盘缓存应该是在幕后。

Subsequent loads should pull the data from cache, but then the data should be reloaded and the disk cache should be behind the scenes.

问题:我要观测量 - A和B. A包含了从本地服务中提取数据(吨回事)嵌套观测量。 B是要简单得多。乙仅包含code从磁盘缓存中提取数据。

The Problem: I have to Observables - A and B. A contains the nested Observables that pull data from the local services (tons going on here). B is much simpler. B simply contains the code to pull the data from disk cache.

需要解决: 一)返回缓存项(如缓存),并继续重新加载磁盘高速缓存。 B)高速缓存是空的,从系统,它缓存加载数据并将其返回。后续调用返回到A。

Need to solve: a) Return a cached item (if cached) and continue to re-load the disk cache. b) Cache is empty, load the data from system, cache it and return it. Subsequent calls go back to "a".

我也曾有过一些人建议,如flatmap一些操作,合并,甚至科目,但由于某种原因,我在点与点之间的麻烦。

I've had a few folks recommend a few operations such as flatmap, merge and even subjects but for some reason I'm having trouble connecting the dots.

我怎样才能做到这一点?

How can I do this?

推荐答案

下面是关于如何做到这几个选项。我会尽力解释他们,尽我所能,我走。这是napkin- code和我使用Java8风格的lambda语法,因为我很懒,它是prettier。 :)

Here are a couple options on how to do this. I'll try to explain them as best I can as I go along. This is napkin-code, and I'm using Java8-style lambda syntax because I'm lazy and it's prettier. :)

  1. 一个主题,如 AsyncSubject ,将是完美的,如果你能在内存中保留这些作为实例的状态,虽然这听起来像你需要存储这些磁盘。但是,我认为这种做法是值得一提的,以防万一你能。此外,它只是一个漂亮的技术,就知道了。 AsyncSubject 是一个可观察的,只有发出公布它的最后一个值(一级学科既是观察和可观察到的),并只开始发射后的 onCompleted 被调用。因此,任何之后完成,订阅将接收的下一个值。

  1. A subject, like AsyncSubject, would be perfect if you could keep these as instance states in memory, although it sounds like you need to store these to disk. However, I think this approach is worth mentioning just in case you are able to. Also, it's just a nifty technique to know. AsyncSubject is an Observable that only emits the LAST value published to it (A Subject is both an Observer and an Observable), and will only start emitting after onCompleted has been called. Thus, anything that subscribes after that complete will receive the next value.

在这种情况下,你可以有(在应用程序级的应用程序类或其他单个实例):

In this case, you could have (in an application class or other singleton instance at the app level):

public class MyApplication extends Application {    
    private final AsyncSubject<Foo> foo = AsyncSubject.create();

    /** Asynchronously gets foo and stores it in the subject. */
    public void fetchFooAsync() {
        // Gets the observable that does all the heavy lifting.
        // It should emit one item and then complete.
        FooHelper.getTheFooObservable().subscribe(foo);
    }

    /** Provides the foo for any consumers who need a foo. */
    public Observable<Foo> getFoo() {
        return foo;
    }

}

  • 推迟了观测。 Observable.defer让你迫不及待地想创建一个可观察的,直到它被订阅。你可以用它来让磁盘缓存读取在后台运行,然后返回缓存的版本,或者,如果不在缓存,使真正的交易。

  • Deferring the Observable. Observable.defer lets you wait to create an Observable until it is subscribed to. You can use this to allow the disk cache fetch to run in the background, and then return the cached version or, if not in cache, make the real deal.

    该版本假设你吸气code,这两种缓存读取和非抓创作,是阻塞调用,没有观测和延迟可以在后台工作。例如:

    This version assumes that your getter code, both cache fetch and non- catch creation, are blocking calls, not observables, and the defer does work in the background. For example:

    public Observable<Foo> getFoo() {
        Observable.defer(() -> {
            if (FooHelper.isFooCached()) {
                return Observable.just(FooHelper.getFooFromCacheBlocking());
            }
            return Observable.just(FooHelper.createNewFooBlocking());
        }).subscribeOn(Schedulers.io());
    }
    

  • 使用 concatWith 。这里我们假设我们的方法从磁盘缓存得到美孚或者发出一个单一的项目和完成,要么刚刚完成且不发光,如果是空的。

  • Use concatWith and take. Here we assume our method to get the Foo from the disk cache either emits a single item and completes or else just completes without emitting, if empty.

    public Observable<Foo> getFoo() {
        return FooHelper.getCachedFooObservable()
                .concatWith(FooHelper.getRealFooObservable())
                .take(1);
    }
    

    这方法应该只尝试获取真实的交易,如果缓存观察到成品空。

    That method should only attempt to fetch the real deal if the cached observable finished empty.

    使用 AMB ambWith​​ 。这可能是一个疯狂的解决方案,但有趣地指出。 AMB 基本上采取一对夫妇(或以上的重载)观测和等待,直到其中一人发出一个项目,那么它完全放弃了其他观察到​​的,只是需要一个赢得比赛。这将是非常有用的唯一方法是,如果有可能创造一个新的富比从磁盘读取速度更快的计算步骤。在这种情况下,你可以做这样的事情:

    Use amb or ambWith. This is probably one the craziest solutions, but fun to point out. amb basically takes a couple (or more with the overloads) observables and waits until one of them emits an item, then it completely discards the other observable and just takes the one that won the race. The only way this would be useful is if it's possible for the computation step of creating a new Foo to be faster than fetching it from disk. In that case, you could do something like this:

    public Observable<Foo> getFoo() {
        return Observable.amb(
                FooHelper.getCachedFooObservable(),
                FooHelper.getRealFooObservable());
    }
    

  • 我还挺preFER选项3.至于实际缓存,你可能有这样的事情在入口点之一(preferably之前,我们要去需要美孚,自你说的这个是一个长期运行的操作),后来的消费者应该得到的缓存的版本,只要完成写作。使用 AsyncSubject 这里可能会有所帮助,以确保我们不会触发作品多次在等待其被写入。消费者只会得到完整的结果,但是,只有当它可以合理地围绕保持在内存中的作品。

    I kinda prefer Option 3. As far as actually caching it, you could have something like this at one of the entry points (preferably before we're gonna need the Foo, since as you said this is a long-running operation) Later consumers should get the cached version as long as it has finished writing. Using an AsyncSubject here may help as well, to make sure we don't trigger the work multiple times while waiting for it to be written. The consumers would only get the completed result, but again, that only works if it can be reasonably kept around in memory.

    if (!FooHelper.isFooCached()) {
        getFoo()
            .subscribeOn(Schedulers.io())
            .subscribe((foo) -> FooHelper.cacheTheFoo(foo));
    }
    

    需要注意的是,您应该保持周围意味着磁盘写入(和读取)一个线程调度器,并使用 .observeOn(富) .subscribeOn(...),或以其他方式同步访问磁盘缓存为prevent并发问题。

    Note that, you should either keep around a single thread scheduler meant for disk writing (and reading) and use .observeOn(foo) after .subscribeOn(...), or otherwise synchronize access to the disk cache to prevent concurrency issues.

    这篇关于RxJava缓存数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆