Reactor Flux replay(int history) 方法未按预期工作 [英] Reactor Flux replay(int history) method not working as expected

查看:60
本文介绍了Reactor Flux replay(int history) 方法未按预期工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用具有以下特征的 Project Reactor 制作 Flux 的示例:

I'm trying to make an example of a Flux with Project Reactor that has the following characteristics:

  • 单个热 observable,每秒发出一个项目.
  • 两个订阅者,每个订阅者都使用发布者的一个单独线程.
  • 调用 replay() 时的有限历史记录,因此如果其中一个订阅者太慢,某些项目将被错过.
  • A single hot observable, which emits one item per second.
  • Two subscribers, each of them using a separate thread of the publisher.
  • A limited history when calling replay(), so some items will be missed in case one of the subscribers is too slow.

然后我编码了这个示例:

Then I coded this sample:

import java.time.Duration;

import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class FluxTest {

  public static void main(String[] args) {
    final ConnectableFlux<Integer> publisher = Flux.range(1, 20)
      .delayElements(Duration.ofSeconds(1))
      .replay(8);

    publisher.publishOn(Schedulers.newSingle("fast"))
      .subscribe(i -> {
        System.out.println("Fast subscriber - Received " + i);
        sleep(1);
      });

    publisher.publishOn(Schedulers.newSingle("slow"))
      .subscribe(i -> {
        System.out.println("Slow subscriber - Received " + i);
        sleep(5);
      });

    publisher.connect();
  }

  private static void sleep(int seconds) {
    try {
      Thread.sleep(seconds * 1000L);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

根据文档replay(int history) 方法,我预计,几秒钟后,第二个消费者(慢的)会开始失去轨道,但事实并非如此.以控制台输出的这部分为例:

According to the documentation of replay(int history) method, I'd expect that, after few seconds, the second consumer (the slow one) would start losing the track, but it doesn't. See this part of the console output as an example:

...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 4
Fast subscriber - Received 16
Fast subscriber - Received 17

我希望慢速订阅者无法接收 4 因为该元素不应再出现在历史中(15 - 8 = 7,这应该是最后一个).

I expected the slow subscriber not to be able to receive 4 since that element should no longer be in history (15 - 8 = 7, that should be the last one).

请注意,如果我使用方法 replay(8, Duration.ofSeconds(8)) 那么我会得到我所期望的:

Note that, if I use the method replay(8, Duration.ofSeconds(8)) then I get what I'd expect:

...
Fast subscriber - Received 14
Fast subscriber - Received 15
Slow subscriber - Received 8
Fast subscriber - Received 16
Fast subscriber - Received 17

我想我在这里遗漏了一些重要的东西,但我不知道它是什么.

I think I'm missing something important here, but I don't know what it is.

推荐答案

replay(8) 能够重放订阅者之前发出的 8 个元素订阅了.对于之后进来的元素,它们会直接转发给订阅者.在这里,您在连接之前订阅 slow,因此重放缓冲区的大小并不重要.

replay(8) is capable of replaying 8 elements that where emitted before the subscriber subscribed. For elements that come in after, they are directly relayed to the subscriber. Here you subscribe slow before you connect, so the size of the replay buffer doesn't really matter.

您的慢速订阅者在专用线程上休眠,所以 publishOn 确实接收到所有数据,将其放入内部 Queue 并在slow 线程来排空该队列,在排空循环中每次迭代都会被阻塞 5 秒.

Your slow subscriber sleeps on a dedicated thread, so what happens is that publishOn does receive all of the data, puts it in an internal Queue and schedules itself on the slow thread to drain that queue, in a drain loop that gets blocked for 5s on each iteration.

不过,操作员已经看到并能够处理所有数据.

Still, the operator has seen and is capable of processing all of the data.

这篇关于Reactor Flux replay(int history) 方法未按预期工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆