引起原因:rx.exceptions.MissingBackpressureException [英] Caused by: rx.exceptions.MissingBackpressureException

查看:724
本文介绍了引起原因:rx.exceptions.MissingBackpressureException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我还有其他问题.这次我在执行此代码期间遇到此错误Caused by: rx.exceptions.MissingBackpressureException:

I have an other problem. This time I am facing this error Caused by: rx.exceptions.MissingBackpressureException during the execution of this code:

class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>

init {
    numberOfFileToUpdate = PublishSubject.create()
}

public fun startUpdate(): Observable<Int>{
    return getProducts().flatMap { products: ArrayList<Product> ->
            numberOfFileToUpdate.onNext(products.size)
            return@flatMap saveRows(products)
        }
}

private fun getProducts(): Observable<ArrayList<Product>> {
    return Observable.create {
        var products: ArrayList<Product> = ArrayList()
        var i = 0
        while (i++ < 100) {
            products.add(Product())
        }

        it.onNext(products)
        it.onCompleted()
    }
}


private fun saveRows(products: ArrayList<Product>): Observable<Int> {
    return Observable.create<Int> {
        var totalNumberOfRow = products.size

        while (totalNumberOfRow-- > 0){
            it.onNext(products.size - totalNumberOfRow)
            Thread.sleep(100)
        }
        it.onCompleted()
    }
}

}

该代码只是两个过程的测试代码.第一个过程从Web获取Product列表,然后将这些产品持久保存到应用程序中的本地数据库中.这是主要思想.

The code is just a test code of two process. The first process gets a list of Product from the web, then those products are persist into the a local database within the app. This is the main idea.

方法getProducts完成获取数据的工作,在这种情况下,我只创建了一个包含100个产品的ArrayList. saveRows执行持久性工作.

The method getProducts do the work of getting the data, in this case I just create an ArrayList of 100 products. The saveRows do the persist work.

saveRows方法发出突出显示表示已保存行的Int.我这样做是因为在UI中,我有一个进度条来报告进度.

The saveRows methods emit juts an Int that represent the saved row. I am doing this because in the UI I have a progressbar reporting the progress.

从应用程序的另一端,我调用方法startUpdate,并在发出一些项目后得到描述异常

From an other point of the app I call the method startUpdate and after a few items emitted I get the describe exception

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)

at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)

我了解为什么应该发生此异常https://github.com/ReactiveX/RxJava/wiki/Backpressure,但是我不知道自己在做什么错或如何解决.

I understand why this exception should be happening https://github.com/ReactiveX/RxJava/wiki/Backpressure but I do not know what I am doing wrong or how to solve it.

有人可以建议我吗?

推荐答案

问题是您的Observable源发出的速度比使用者消耗的速度快.保存每个产品需要100毫秒.您可以添加onBackpressureBuffer().

The problem is your Observable source emits faster than the consumer consumes. It takes 100 ms to save each product. You can add onBackpressureBuffer().

UpdateHelper().startUpdate()
    .onBackpressureBuffer() // Add this
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe({
      Log.d(TAG, "next $it")
    }, {
      Log.d(TAG, it.message)
    }, {
    })

此外,您可以尝试删除Thread.sleep(100).

Also, you can try removing Thread.sleep(100).

flatmap使用 OperatorMerge (merge(map(func))):在您的情况下,您可以看到map的onNexts发送的速度比要求的要快.

flatmap use OperatorMerge ( merge(map(func))): you can see that in your case, map's onNexts are sent faster than have been requested.

这篇关于引起原因:rx.exceptions.MissingBackpressureException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆