仅使用 onBackpressureLatest() 消耗最新项目 [英] Only consume latest item with onBackpressureLatest()

查看:27
本文介绍了仅使用 onBackpressureLatest() 消耗最新项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个定期发出项目的生产者和一个有时很慢的消费者.重要的是,消费者只使用最近的项目.我认为 onBackpressureLatest() 是这个问题的完美解决方案.于是我写了下面的测试代码:

I have a producer which emits items periodically and a consumer which is sometimes quite slow. It is important that the consumer only works with recent items. I thought onBackpressureLatest() is the perfect solution for this problem. So I wrote the following test code:

PublishProcessor<Integer> source = PublishProcessor.create();
source
        .onBackpressureLatest()
        .observeOn(Schedulers.from(Executors.newCachedThreadPool()))
        .subscribe(i -> {
            System.out.println("Consume: " + i);
            Thread.sleep(100);
        });

for (int i = 0; i < 10; i++) {
    System.out.println("Produce: " + i);
    source.onNext(i);
}

我希望它记录如下内容:

I expected it to log something like:

Produce: 0
...
Produce: 9
Consume: 0
Consume: 9

相反,我得到

Produce: 0
...
Produce: 9
Consume: 0
Consume: 1
...
Consume: 9

onBackpressureLatest() 和 onBackpressureDrop() 都没有任何效果.只有 onBackpressureBuffer(i) 会导致异常.

onBackpressureLatest() and onBackpressureDrop() do both not have any effect. Only onBackpressureBuffer(i) causes an exception.

我使用 rxjava 2.1.9.任何想法问题/我的误解可能是什么?

I use rxjava 2.1.9. Any ideas what the problem/my misunderstanding could be?

推荐答案

observeOn 有一个内部缓冲区(默认 128 个元素),可以立即轻松获取所有源项目,因此 onBackpressureLatest 总是被完全消耗掉.

observeOn has an internal buffer (default 128 elements) that will pick up all source items easily immediately, thus the onBackpressureLatest is always fully consumed.

您可以创建的最小缓冲区是 1,它应该提供所需的模式:

The smallest buffer you can create is 1 which should provide the required pattern:

source.onBackpressureLatest()
      .observeOn(Schedulers.from(Executors.newCachedThreadPool()), false, 1)
      .subscribe(v -> { /* ... */ });

(之前的 delay + rebatchRequest 组合实际上等价于此).

(the earlier delay + rebatchRequest combination is practically equivalent to this).

这篇关于仅使用 onBackpressureLatest() 消耗最新项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆