引起原因:rx.exceptions.MissingBackpressureException [英] Caused by: rx.exceptions.MissingBackpressureException
问题描述
我还有其他问题.这次我在执行此代码期间遇到此错误Caused by: rx.exceptions.MissingBackpressureException
:
I have an other problem. This time I am facing this error Caused by: rx.exceptions.MissingBackpressureException
during the execution of this code:
class UpdateHelper {
val numberOfFileToUpdate: PublishSubject<Int>
init {
numberOfFileToUpdate = PublishSubject.create()
}
public fun startUpdate(): Observable<Int>{
return getProducts().flatMap { products: ArrayList<Product> ->
numberOfFileToUpdate.onNext(products.size)
return@flatMap saveRows(products)
}
}
private fun getProducts(): Observable<ArrayList<Product>> {
return Observable.create {
var products: ArrayList<Product> = ArrayList()
var i = 0
while (i++ < 100) {
products.add(Product())
}
it.onNext(products)
it.onCompleted()
}
}
private fun saveRows(products: ArrayList<Product>): Observable<Int> {
return Observable.create<Int> {
var totalNumberOfRow = products.size
while (totalNumberOfRow-- > 0){
it.onNext(products.size - totalNumberOfRow)
Thread.sleep(100)
}
it.onCompleted()
}
}
}
该代码只是两个过程的测试代码.第一个过程从Web获取Product
列表,然后将这些产品持久保存到应用程序中的本地数据库中.这是主要思想.
The code is just a test code of two process. The first process gets a list of Product
from the web, then those products are persist into the a local database within the app. This is the main idea.
方法getProducts
完成获取数据的工作,在这种情况下,我只创建了一个包含100个产品的ArrayList. saveRows
执行持久性工作.
The method getProducts
do the work of getting the data, in this case I just create an ArrayList of 100 products. The saveRows
do the persist work.
saveRows
方法发出突出显示表示已保存行的Int
.我这样做是因为在UI中,我有一个进度条来报告进度.
The saveRows
methods emit juts an Int
that represent the saved row. I am doing this because in the UI I have a progressbar reporting the progress.
从应用程序的另一端,我调用方法startUpdate
,并在发出一些项目后得到描述异常
From an other point of the app I call the method startUpdate
and after a few items emitted I get the describe exception
at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:46)
at com.techbyflorin.rockapan.helpers.UpdateHelper$saveRows$1.call(UpdateHelper.kt:40)
我了解为什么应该发生此异常https://github.com/ReactiveX/RxJava/wiki/Backpressure
,但是我不知道自己在做什么错或如何解决.
I understand why this exception should be happening https://github.com/ReactiveX/RxJava/wiki/Backpressure
but I do not know what I am doing wrong or how to solve it.
有人可以建议我吗?
推荐答案
问题是您的Observable源发出的速度比使用者消耗的速度快.保存每个产品需要100毫秒.您可以添加onBackpressureBuffer().
The problem is your Observable source emits faster than the consumer consumes. It takes 100 ms to save each product. You can add onBackpressureBuffer().
UpdateHelper().startUpdate()
.onBackpressureBuffer() // Add this
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
Log.d(TAG, "next $it")
}, {
Log.d(TAG, it.message)
}, {
})
此外,您可以尝试删除Thread.sleep(100)
.
Also, you can try removing Thread.sleep(100)
.
flatmap使用 OperatorMerge (merge(map(func))
):在您的情况下,您可以看到map
的onNexts发送的速度比要求的要快.
flatmap use OperatorMerge ( merge(map(func))
): you can see that in your case, map
's onNexts are sent faster than have been requested.
这篇关于引起原因:rx.exceptions.MissingBackpressureException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!