RxJava Observable.cache无效 [英] RxJava Observable.cache invalidate
问题描述
我正在尝试在Android环境中学习rxjava. 假设我有一个可观察对象,它发出网络调用的结果. 如果我理解正确,那么一种处理配置更改的普遍方法是:
I am trying to learn rxjava inside Android environment. Let's say I have an observable that emits the result of a network call. If I understood correctly, a widely common approach to deal with config changes is to:
-
将可观察对象存储在保留的片段/单例/应用程序对象中
store the observable in a retained fragment / singleton / application object
将缓存运算符应用于可观察的
apply the cache operator to the observable
订阅/取消订阅适当的生命周期处理程序
subscribe / unsubscribe in the proper lifecycle handlers
这样做,我们不会丢失可观察到的结果,一旦进行了新配置,该结果将被重新观察.
Doing this, we would not loose the result of the observable which will re-observerd once the new configuration took place.
现在,我的问题是:
是否有一种方法可以迫使可观察对象发出新值(并使已缓存的值无效)?每当我想要从网络中获取新数据时,我是否需要创建一个新的可观察对象(在android世界中这听起来不是一个坏习惯,因为这会使gc做额外的工作)?
Is there a way to force the observable to emit a new value (and invalidate the cached one)? Do I need to create a new observable every time I want fresh data from the network (which does not sound like a bad practice in android world because would make the gc do extra work)?
非常感谢
Federico
推荐答案
进行自定义的OnSubscribe
实现,以实现所需的功能:
Make a custom OnSubscribe
implementation that does what you want:
public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {
private final AtomicBoolean refresh = new AtomicBoolean(true);
private final Observable<T> source;
private volatile Observable<T> current;
public OnSubscribeRefreshingCache(Observable<T> source) {
this.source = source;
this.current = source;
}
public void reset() {
refresh.set(true);
}
@Override
public void call(Subscriber<? super T> subscriber) {
if (refresh.compareAndSet(true, false)) {
current = source.cache();
}
current.unsafeSubscribe(subscriber);
}
}
这段代码演示了用法,并表明缓存已被重置:
This bit of code demonstrates usage and shows that cache is essentially being reset:
Observable<Integer> o = Observable.just(1)
.doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher =
new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);
输出:
completed
1
1
completed
1
通过这种方式,您可能会注意到.cache
直到完成才发出.这是应该由rxjava 1.0.14修复的错误.
By the way you may notice that .cache
doesn't emit till completion. This is a bug that should be fixed by rxjava 1.0.14.
就您的GC压力而言,每个运算符应用于Observable时通常会通过lift
或create
创建一个新的Observable.与创建新的Observable相关的基本成员状态是对onSubscribe
函数的引用. cache
与大多数不同之处在于,它在所有订阅中都具有状态,并且如果它具有很多状态并且经常被丢弃,则可能会给GC带来压力.即使您使用相同的可变数据结构来保持复位状态,GC清除后仍将不得不处理数据结构的内容,因此您可能不会获得太多收益.
In terms of your GC pressure concerns, every operator when applied to an Observable creates a new Observable usually via lift
or create
. The base member state associated with creating a new Observable is the reference to the onSubscribe
function. cache
is different from most in that it holds state across subscriptions and this holds potential for GC pressure if it holds a lot of state and is thrown away frequently. Even if you used the same mutable data structure to hold the state across resets GC would still have to deal with the contents of the data structure when cleared so you might not gain much.
RxJava cache
运算符是为多个并发订阅构建的.您可能可以想象,重置功能可能难以实现.如果您想进一步探索,一定要在RxJava github上提出一个问题.
The RxJava cache
operator is built for multiple concurrent subscriptions. You can probably imagine a reset functionality could prove problematic to implement. By all means raise an issue on RxJava github if you want to explore further.
这篇关于RxJava Observable.cache无效的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!