使用onBackpressureLatest丢弃中间消息以阻止Flowable [英] Using `onBackpressureLatest` to drop intermediate messages in blocking Flowable
问题描述
我有一条链,可以在其中执行一些阻塞的IO调用(例如HTTP调用).我希望阻塞调用使用一个值,继续执行而不中断,但同时丢弃所有堆积的内容,然后以相同的方式使用下一个值.
I have a chain where I do some blocking IO calls (e.g. HTTP-call). I want the blocking call to consume a value, proceed without interrupting, but drop everything that is piling up meanwhile, and then consume the next value in the same manner.
请考虑以下示例:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
Thread.sleep(1000)
it
}.blockingForEach { println(it) }
}
从幼稚的角度来看,我希望它会打印类似0, 10, 20, ...
的内容,但它会打印0, 1, 2, ...
.
From a naive point of view, I would it expect to print something like 0, 10, 20, ...
, but it prints 0, 1, 2, ...
.
我在做什么错了?
我想过要天真地添加debounce
来吞噬传入流:
I thought about naively adding debounce
to eat up the incoming stream:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.debounce(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
但是,现在我得到一个java.lang.InterruptedException: sleep interrupted
.
But, now I get a java.lang.InterruptedException: sleep interrupted
.
似乎可行的方法如下:
fun main() {
Flowable.interval(100, TimeUnit.MILLISECONDS)
.throttleLast(0, TimeUnit.MILLISECONDS)
.map {
Thread.sleep(1000)
it
}
.blockingForEach { println(it) }
}
输出符合预期0, 10, 20, ...
!!
这是正确的方法吗?
我注意到throttleLast
将切换到Computation-Scheduler.有没有办法回到原始计划程序?
I noted that throttleLast
will switch to the Computation-Scheduler. Is there a way to go back to the original scheduler?
我偶尔也会得到带有该变体的java.lang.InterruptedException: sleep interrupted
.
I also get an occasional java.lang.InterruptedException: sleep interrupted
with that variant.
推荐答案
解决问题的最简单方法是:
The most simple approach to solve the problem is:
fun <T> Flowable<T>.lossy() : Flowable<T> {
return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}
通过在Flowable
上调用lossy
,它开始删除所有传入的元素,其速度比下游使用者可以处理的速度快.
By calling lossy
on a Flowable
it starts to drop all element that are coming in faster than the downstream consumer can process.
这篇关于使用onBackpressureLatest丢弃中间消息以阻止Flowable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!