RxJava构建缓存与android的observables [英] RxJava building cache with observables for android

查看:73
本文介绍了RxJava构建缓存与android的observables的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很难理解如何使用RxJava构建缓存。我的想法是,我需要从内存缓存中获取数据或从我的数据库(dynamoDb)加载。但是,此缓存应该跨片段和/或线程共享。所以我需要返回当前正在运行但尚未完成的现有observable。这允许线程赶上而不做不必要的工作。我是RxJava的新手所以这就是我想到的草图(为了简洁而缺少一些代码):

I'm having a hard time understanding on how to build a cache using RxJava. The idea is that I need to either get data from a memory cache or load from my database (dynamoDb). However, this cache is supposed to be shared across fragments and or threads. So I need to return existing observables that are currently running and not finished. This allows threads to catch up and not do unnecessary work. I'm new to RxJava so this is what I have in mind as a sketch (missing some code for brevity):

public class DBCache<K, T> {
private final ConcurrentHashMap<K, Set<T>> resultCache = new ConcurrentHashMap<>;
private final ConcurrentHashMap<K, Observable<Set<T>>> observableCache = new ConcurrentHashMap<>;

private Observable<Set<T>> getFromCache(final DynamoDbCacheKey<K, T> query) {
    return Observable.create(new Observable.OnSubscribe<Set<T>>() {
        @Override
        public void call(Subscriber<? super Set<T>> subscriber) {
        Set<T> results = resultCache.get(query.getKey());
        if (results != null && results.size() > 0) {
            subscriber.onNext(results);
        }
        subscriber.onCompleted();
        }
    });
}

public Observable<Set<T>> get(final QueryCacheKey<K, T> query){
    Observable<Set<T>> cachedObservable = observableCache.get(query.getKey());
    if (cachedObservable != null) {
        return cachedObservable;
    }
    Observable<Set<T>> observable = Observable
        .concat(getFromCache(query), getFromNetwork(query))
        .first()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .cache();
    observableCache.putIfAbsent(query.getKey(), observable);
return observable;
} 

private Observable<Set<T>> getFromNetwork(final QueryCacheKey<K, T> query) {
    return Observable.create(new Observable.OnSubscribe<Set<T>>() {
    @Override
    public void call(Subscriber<? super Set<T>> subscriber) {
        try {
            Set<T> results = loadFromDb(query); //omitted
            resultCache.putIfAbsent(query.getKey(), results);
            subscriber.onNext(results);
            subscriber.onCompleted();
            observableCache.remove(query.getKey());
        } catch (Exception exception) {
            subscriber.onError(exception);
        }
    }
    });
} 

}

有没有更好的方法通过RxJava实现这一点(对缓存策略不感兴趣)。有什么想法?

Is there a better way to achieve this through RxJava (Not interested in caching strategies). Any thoughts?

推荐答案

这是一个简单的缓存和检索值的例子:

Here is a simple example of doing caching and retrieving a value once:

public class RxCache<K, V> {

    final ConcurrentHashMap<K, AsyncSubject<V>> cache;

    final Func1<K, Observable<V>> valueGenerator;

    public RxCache(Func1<K, Observable<V>> valueGenerator) {
        this.valueGenerator = valueGenerator;
        this.cache = new ConcurrentHashMap<>();
    }

    public Observable<V> get(K key) {
        AsyncSubject<V> o = cache.get(key);
        if (o != null) {
            return o;
        }

        o = AsyncSubject.create();

        AsyncSubject<V> p = cache.putIfAbsent(key, o);
        if (p != null) {
            return p;
        }

        valueGenerator.call(key).subscribe(o);

        return o;
    }

    public void remove(K key) {
        cache.remove(key);
    }
}

如果您有多个值,请替换 AsyncSubject with ReplaySubject

If you have multiple values, replace AsyncSubject with ReplaySubject.

这篇关于RxJava构建缓存与android的observables的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆