RxJava Observable.fromEmitter 奇怪的背压行为 [英] RxJava Observable.fromEmitter odd backpressure behaviour

查看:55
本文介绍了RxJava Observable.fromEmitter 奇怪的背压行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在使用 Observable.fromEmitter() 作为 Observable.create() 的绝佳替代品.我最近遇到了一些奇怪的行为,我不太明白为什么会这样.我真的很感谢对背压和调度程序有一定了解的人看看这个.

I've been making use of Observable.fromEmitter() as a fantastic alternative to Observable.create(). I've recently run into some weird behaviour and I can't quite work out why this is the case. I'd really appreciate someone with some knowledge on backpressure and schedulers taking a look at this.

public final class EmitterTest {
  public static void main(String[] args) {
    Observable<Integer> obs = Observable.fromEmitter(emitter -> {
      for (int i = 1; i < 1000; i++) {
        if (i % 5 == 0) {
          sleep(300L);
        }

        emitter.onNext(i);
      }

      emitter.onCompleted();
    }, Emitter.BackpressureMode.LATEST);

    obs.subscribeOn(Schedulers.computation())
        .observeOn(Schedulers.computation())
        .subscribe(value -> System.out.println("Received " + value)); // Why does this get stuck at "Received 128"

    sleep(10000L);
  }

  private static void sleep(Long duration) {
    try {
      Thread.sleep(duration);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
}

这个应用程序的输出是

Received 1
Received 2
...
Received 128

然后它仍然停留在 128(大概是因为这是 RxJava 的默认缓冲区大小).

Then it remains stuck at 128 (assumedly because this is RxJava's default buffer size).

如果我将 fromEmitter() 中指定的模式更改为 BackpressureMode.NONE,则代码按预期工作.如果我删除对 observeOn() 的调用,它也会按预期工作.有人能解释为什么会这样吗?

If I change the mode specified in fromEmitter() to BackpressureMode.NONE, then the code works as intended. If I remove the call to observeOn(), it also works as intended. Is anyone able to explain why this is the case?

推荐答案

这是同池死锁情况.subscribeOn 将下游 request 安排在它正在使用的同一线程上,但如果该线程忙于睡眠/发射循环,则请求永远不会传递到 fromEmitter 因此在一段时间后 LATEST 开始丢弃元素,直到最后一个值(999)被传递,如果主源等待的时间足够长.(这与我们删除的 onBackpressureBlock 的情况类似.)

This is a same-pool deadlock situation. subscribeOn schedules the downstream request on the same thread it is using but if that thread is busy with a sleep/emission loop, the request gets never delivered to fromEmitter and thus after some time LATEST starts to drop elements up until the very end where the very last value (999) is delivered if the main source waits long enough. (This is a similar situation with onBackpressureBlock we removed.)

如果 subscribeOn 没有执行此请求调度,该示例将正常工作.

If subscribeOn didn't do this scheduling of requests, the example would work propery.

我打开了一个问题来找出解决方案.

I've opened an issue to work out the solutions.

目前的解决方法是使用更大的缓冲区大小和 observeOn(有一个过载)或使用 fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

The workaround for now is to use bigger buffer size with observeOn (there's an overload) or use fromEmitter(f, NONE).subscribeOn().onBackpressureLatest().observeOn()

这篇关于RxJava Observable.fromEmitter 奇怪的背压行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆