使用onBackpressureLatest丢弃中间消息以阻止Flowable [英] Using `onBackpressureLatest` to drop intermediate messages in blocking Flowable

查看:445
本文介绍了使用onBackpressureLatest丢弃中间消息以阻止Flowable的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一条链,可以在其中执行一些阻塞的IO调用(例如HTTP调用).我希望阻塞调用使用一个值,继续执行而不中断,但同时丢弃所有堆积的内容,然后以相同的方式使用下一个值.

I have a chain where I do some blocking IO calls (e.g. HTTP-call). I want the blocking call to consume a value, proceed without interrupting, but drop everything that is piling up meanwhile, and then consume the next value in the same manner.

请考虑以下示例:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS).onBackpressureLatest().map {
    Thread.sleep(1000)
    it
  }.blockingForEach { println(it) }
}

从幼稚的角度来看,我希望它会打印类似0, 10, 20, ...的内容,但它会打印0, 1, 2, ....

From a naive point of view, I would it expect to print something like 0, 10, 20, ..., but it prints 0, 1, 2, ....

我在做什么错了?

我想过要天真地添加debounce来吞噬传入流:

I thought about naively adding debounce to eat up the incoming stream:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .debounce(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

但是,现在我得到一个java.lang.InterruptedException: sleep interrupted.

But, now I get a java.lang.InterruptedException: sleep interrupted.

似乎可行的方法如下:

fun main() {
  Flowable.interval(100, TimeUnit.MILLISECONDS)
    .throttleLast(0, TimeUnit.MILLISECONDS)
    .map {
      Thread.sleep(1000)
      it
    }
    .blockingForEach { println(it) }
}

输出符合预期0, 10, 20, ... !!

这是正确的方法吗?

我注意到throttleLast将切换到Computation-Scheduler.有没有办法回到原始计划程序?

I noted that throttleLast will switch to the Computation-Scheduler. Is there a way to go back to the original scheduler?

我偶尔也会得到带有该变体的java.lang.InterruptedException: sleep interrupted.

I also get an occasional java.lang.InterruptedException: sleep interrupted with that variant.

推荐答案

解决问题的最简单方法是:

The most simple approach to solve the problem is:

fun <T> Flowable<T>.lossy() : Flowable<T> {
  return onBackpressureLatest().observeOn(Schedulers.io(), false, 1)
}

通过在Flowable上调用lossy,它开始删除所有传入的元素,其速度比下游使用者可以处理的速度快.

By calling lossy on a Flowable it starts to drop all element that are coming in faster than the downstream consumer can process.

这篇关于使用onBackpressureLatest丢弃中间消息以阻止Flowable的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆