RxJava Observable.create包装可观察的订阅 [英] RxJava Observable.create wrapping observable subscriptions

查看:179
本文介绍了RxJava Observable.create包装可观察的订阅的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用了Observable.create,因此可以在某些数据可用时通知订阅者.我不确定在我的create方法中订阅可观察对象.这些嵌套订阅会给我带来什么问题吗?我对使用Observable.create创建可观察对象并不完全熟悉,因此我想确保自己不会做任何异常或滥用它.预先谢谢你!

I used Observable.create so I could notify the subscriber when certain data was available. I am a little uncertain of subscribing to observables inside of my create method. Are these nested subscriptions going to give me any sort of issue? I'm not completely familiar with creating observables using Observable.create so I wanted to make sure I'm not doing anything out of the ordinary or misusing it. Thank you in advance!

abstract class NetworkResource<ApiType, DbType> constructor(private val schedulerProvider: SchedulerProvider) {

    abstract fun fetchFromApi(): Single<ApiType>
    abstract fun fetchFromDb(): Observable<Optional<DbType>>
    abstract fun saveToDb(apiType: ApiType?)
    abstract fun shouldFetchFromApi(cache: DbType?): Boolean

    fun fetch(): Observable<Optional<DbType>>  {
        return Observable.create<Optional<DbType>> {
            val subscriber = it

            fetchFromDb()
                    .subscribe({
                        subscriber.onNext(it)

                        if(shouldFetchFromApi(it.get())) {
                            fetchFromApi()
                                    .observeOn(schedulerProvider.io())
                                    .map {
                                        saveToDb(it)
                                        it
                                    }
                                    .observeOn(schedulerProvider.ui())
                                    .flatMapObservable {
                                        fetchFromDb()
                                    }
                                    .subscribe({
                                        subscriber.onNext(it)
                                        subscriber.onComplete()
                                    })
                        }
                        else {
                            subscriber.onComplete()
                        }
                    })

        }
    }
}

推荐答案

是的,它将引起问题.

Yes, it will cause an issues.

首先,嵌套这样的Observable并不是惯用的,这是Reactive方法的优势之一,是组成Observables,因此只有一个干净的流.这样,您就打破了链条,直接的结果是交织在一起的代码更难阅读,并且有更多代码连接通知事件,基本上就像用Observable包装异步回调方法一样.
在这里,因为您已经具有反应性组件,所以只需将它们组成即可,而不必使用回调方法进行处理.

First, it is not idiomatic to nest Observable like this, one of the strengths of Reactive approach, is composing Observables, and thus have single clean stream. with this way, you are breaking the chain, and the immediate result is intertwined code which is harder to read, and more code to wire up the notification events, basically it is like wrapping async callback methods with Observable.
here as you have already reactive components you can simply compose them instead of treating them with callback approach.

第二,由于断开了链,因此是最严重和最直接的-取消订阅外部的Observable不会自动影响内部的Observable.尝试添加subscribeOn()的情况也是如此,并且在背压很重要的其他情况下也是如此.

Second, as a result of breaking the chain, the most sever and immediate one - unsubscribing the outer Observable will not affect automatically the inner Observable. same goes for trying to add subscribeOn() and with different scenario where backpressure is important it's also apply.

一个合成的替代方案可能是这样的:

an composing alternative might be something like this:

fun fetch2(): Observable<Optional<DbType>> {
        return fetchFromDb()
                .flatMap {
                    if (shouldFetchFromApi(it.get())) {
                        fetchFromApi()
                                .observeOn(schedulerProvider.io())
                                .doOnSuccess { saveToDb(it) }
                                .observeOn(schedulerProvider.ui())
                                .flatMapObservable {
                                    fetchFromDb()
                                }

                    } else {
                        Observable.empty()
                    }
                }
    }

如果出于某种原因,您希望在任何情况下都单独发出第一个fetchFromDb()结果,则也可以将publish()与选择器一起使用:

if from some reason, you want in any case the first fetchFromDb() result to be emitted separately, you can also do it using publish() with selector:

 fun fetch2(): Observable<Optional<DbType>> {
    return fetchFromDb()
            .publish {
                Observable.merge(it,
                        it.flatMap {
                            if (shouldFetchFromApi(it.get())) {
                                fetchFromApi()
                                        .observeOn(schedulerProvider.io())
                                        .doOnSuccess { saveToDb(it) }
                                        .observeOn(schedulerProvider.ui())
                                        .flatMapObservable {
                                            fetchFromDb()
                                        }

                            } else {
                                Observable.empty()
                            }
                        })
            }

}

这篇关于RxJava Observable.create包装可观察的订阅的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆