MissingBackpressureException:由于缺少请求而无法发出缓冲区 [英] MissingBackpressureException: Could not emit buffer due to lack of requests

查看:189
本文介绍了MissingBackpressureException:由于缺少请求而无法发出缓冲区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我收到了一个错误报告,包括 MissingBackpressureException:由于缺乏请求而无法发出缓冲区 RxJava Flowable,但我正在努力创建一个简单的测试用例演示了问题(维护Flowable的结构).

I've received a bug report including MissingBackpressureException: Could not emit buffer due to lack of requests for an RxJava Flowable, but I'm struggling to create a simple test case the demonstrates the problem (maintaining the structure of the Flowable).

这是我正在尝试组合的测试,它在管道中保持相同的阶段:

Here's the test I'm trying to put together, which maintains the same stages in the pipeline:

int inputEvents=10000;

CountDownLatch completed = new CountDownLatch(1);
Flowable<List<String>> flowable = Flowable.<String>create(e -> {

    System.out.println(Thread.currentThread().getName() + ": Will send");
    for (int counter = 0; counter < inputEvents; counter++) {
        e.onNext("" + counter);
        Thread.sleep(5);
    }
    System.out.println(Thread.currentThread().getName() + ": Completed sending");
    e.onComplete();
}, BackpressureStrategy.DROP)
    .onBackpressureDrop(s -> System.out.println("Backpressure, dropping " + Arrays.asList(s)))
    .buffer(1, TimeUnit.SECONDS)
    .doOnNext(strings -> System.out.println("\t" + Thread.currentThread().getName() + ": Buffered: " + strings.size() + " items"))
    .observeOn(Schedulers.io(), false)
    .doOnNext(strings -> {
        System.out.println("\t" + "\t" + Thread.currentThread().getName() + ": Waiting: " + strings.size());
        Thread.sleep(5000);
    });

flowable
    .subscribe(s -> System.out.println("\t" + "\t" + "onNext: " + s.size()),
            error -> {
                throw new RuntimeException(error);
            },
            () -> {
                System.out.println("\t" + "\t" + "Complete");
                completed.countDown();
            });

completed.await();

在生产中,我们得到MissingBackpressureException:由于缺少请求而无法发出缓冲区,堆栈跟踪如下:

In production, we get MissingBackpressureException: Could not emit buffer due to lack of requests with the following stack trace:

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Could not emit buffer due to lack of requests
        at io.reactivex.rxjava3.internal.subscribers.QueueDrainSubscriber.fastPathEmitMax(QueueDrainSubscriber.java:87)
        at io.reactivex.rxjava3.internal.operators.flowable.FlowableBufferTimed$BufferExactUnboundedSubscriber.run(FlowableBufferTimed.java:207)
        at io.reactivex.rxjava3.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:39)
        at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
        at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

所以我认为这与缓冲区的下游工作有关.

So I think this relates to downstream work from the buffer.

然而,无论我在 doOnNext 中阻塞多久,我都无法重现该问题.示例输出:

However, no matter how long I block in the doOnNext I can't reproduce the problem. Example output:

main: Will send
    RxComputationThreadPool-1: Buffered: 197 items
        RxCachedThreadScheduler-1: Waiting: 197
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
        onNext: 197
        RxCachedThreadScheduler-1: Waiting: 196
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 196 items
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
        onNext: 196
        RxCachedThreadScheduler-1: Waiting: 197
    RxComputationThreadPool-1: Buffered: 197 items
    RxComputationThreadPool-1: Buffered: 197 items
...

我在预料到,由于 Thread.sleep(5000) 需要很长时间,我们会承受压力.

I was expecting, as the Thread.sleep(5000) takes so long, we would get back pressure.

有没有办法模拟这一点,最好是在使用 TestScheduler/TestSubscriber 的测试中(以避免 Thread.sleep()s)?

Is there a way of simulating this, ideally in a test using TestScheduler/TestSubscriber (to avoid the Thread.sleep()s)?

推荐答案

我能够通过增加事件发出的速率、增加事件的最大数量和降低消费者的速率来重现 MissingBackpressureException处理它们.

I was able to reproduce the MissingBackpressureException by increasing the rate at which your events are emitted, increasing the max number of events, and reducing the rate at which the consumer processes them.

溢出的缓冲区是默认的 observeOn(...) 运算符的大小为 128 的缓冲区.由于它每秒接收一次新列表,因此至少需要几分钟的时间在溢出之前施加压力.

The buffer that's overflowing is the default observeOn(...) operator's buffer of size 128. Since it's receiving a new list once a second, it will take at least a couple of minutes of back pressure before it will overflow.

注意,您可以通过将其作为参数传递给 observeOn(...) 来覆盖此默认缓冲区大小.

Note, you can override this default buffer size by passing it as an arg to observeOn(...).

回到背压处理,我认为您的管道的主要问题是 buffer(1, TimeUnit.SECONDS) 运算符.如果您查看 javadoc:

Getting back to backpressure handling, I think the main issue with your pipeline is the buffer(1, TimeUnit.SECONDS) operator. If you look at the javadoc:

背压:这个操作符不支持背压,因为它使用时间.它向上游请求 Long.MAX_VALUE 并且不服从下游请求.

Backpressure:This operator does not support backpressure as it uses time. It requests Long.MAX_VALUE upstream and does not obey downstream requests.

由于上述原因,您的 onBackPressureDrop(...) 永远不会被调用.我认为您可以通过在 buffer(...) 之后放置 onBackPressureDrop(...) 来解决这个问题.这样做会导致您的 Backpressure, drop... 消息.

As a result of the above your onBackPressureDrop(...) never gets invoked. I think you fix this by placing onBackPressureDrop(...) after buffer(...). Doing so results in your Backpressure, dropping... message.

您应该能够使用以下方法对其进行单元测试:TestScheduler.advanceTimeBy(long, TimeUnit).虽然我不得不承认,我还没有尝试过.

You should be able to unit test this using: TestScheduler.advanceTimeBy(long, TimeUnit). Though I have to admit, I haven't tried it yet.

这篇关于MissingBackpressureException:由于缺少请求而无法发出缓冲区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆