RxJava Observable.cache无效 [英] RxJava Observable.cache invalidate

查看:173
本文介绍了RxJava Observable.cache无效的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在Android环境中学习rxjava. 假设我有一个可观察对象,它发出网络调用的结果. 如果我理解正确,那么一种处理配置更改的普遍方法是:

I am trying to learn rxjava inside Android environment. Let's say I have an observable that emits the result of a network call. If I understood correctly, a widely common approach to deal with config changes is to:

  • 将可观察对象存储在保留的片段/单例/应用程序对象中

  • store the observable in a retained fragment / singleton / application object

缓存运算符应用于可观察的

apply the cache operator to the observable

订阅/取消订阅适当的生命周期处理程序

subscribe / unsubscribe in the proper lifecycle handlers

这样做,我们不会丢失可观察到的结果,一旦进行了新配置,该结果将被重新观察.

Doing this, we would not loose the result of the observable which will re-observerd once the new configuration took place.

现在,我的问题是:

是否有一种方法可以迫使可观察对象发出新值(并使已缓存的值无效)?每当我想要从网络中获取新数据时,我是否需要创建一个新的可观察对象(在android世界中这听起来不是一个坏习惯,因为这会使gc做额外的工作)?

Is there a way to force the observable to emit a new value (and invalidate the cached one)? Do I need to create a new observable every time I want fresh data from the network (which does not sound like a bad practice in android world because would make the gc do extra work)?

非常感谢

Federico

推荐答案

进行自定义的OnSubscribe实现,以实现所需的功能:

Make a custom OnSubscribe implementation that does what you want:

public static class OnSubscribeRefreshingCache<T> implements OnSubscribe<T> {

    private final AtomicBoolean refresh = new AtomicBoolean(true);
    private final Observable<T> source;
    private volatile Observable<T> current;

    public OnSubscribeRefreshingCache(Observable<T> source) {
        this.source = source;
        this.current = source;
    }

    public void reset() {
        refresh.set(true);
    }

    @Override
    public void call(Subscriber<? super T> subscriber) {
        if (refresh.compareAndSet(true, false)) {
            current = source.cache();
        }
        current.unsafeSubscribe(subscriber);
    }

}

这段代码演示了用法,并表明缓存已被重置:

This bit of code demonstrates usage and shows that cache is essentially being reset:

Observable<Integer> o = Observable.just(1)
        .doOnCompleted(() -> System.out.println("completed"));
OnSubscribeRefreshingCache<Integer> cacher = 
    new OnSubscribeRefreshingCache<Integer>(o);
Observable<Integer> o2 = Observable.create(cacher);
o2.subscribe(System.out::println);
o2.subscribe(System.out::println);
cacher.reset();
o2.subscribe(System.out::println);

输出:

completed
1
1
completed
1

通过这种方式,您可能会注意到.cache直到完成才发出.这是应该由rxjava 1.0.14修复的错误.

By the way you may notice that .cache doesn't emit till completion. This is a bug that should be fixed by rxjava 1.0.14.

就您的GC压力而言,每个运算符应用于Observable时通常会通过liftcreate创建一个新的Observable.与创建新的Observable相关的基本成员状态是对onSubscribe函数的引用. cache与大多数不同之处在于,它在所有订阅中都具有状态,并且如果它具有很多状态并且经常被丢弃,则可能会给GC带来压力.即使您使用相同的可变数据结构来保持复位状态,GC清除后仍将不得不处理数据结构的内容,因此您可能不会获得太多收益.

In terms of your GC pressure concerns, every operator when applied to an Observable creates a new Observable usually via lift or create. The base member state associated with creating a new Observable is the reference to the onSubscribe function. cache is different from most in that it holds state across subscriptions and this holds potential for GC pressure if it holds a lot of state and is thrown away frequently. Even if you used the same mutable data structure to hold the state across resets GC would still have to deal with the contents of the data structure when cleared so you might not gain much.

RxJava cache运算符是为多个并发订阅构建的.您可能可以想象,重置功能可能难以实现.如果您想进一步探索,一定要在RxJava github上提出一个问题.

The RxJava cache operator is built for multiple concurrent subscriptions. You can probably imagine a reset functionality could prove problematic to implement. By all means raise an issue on RxJava github if you want to explore further.

这篇关于RxJava Observable.cache无效的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆