默认情况下,Spring Reactor中的订阅者是否不受限制? [英] Are subscribers in Spring Reactor unbounded by default?

查看:333
本文介绍了默认情况下,Spring Reactor中的订阅者是否不受限制?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我一直在Spring Reactor中工作,并且之前进行过一些测试,这些使我感到奇怪,默认情况下,助焊剂如何处理背压.我知道onBackpressureBuffer等存在,并且我还阅读了解决方案

我将分享我对该主题的了解,这可能对您的问题来说是太多了,但我希望它对刚刚熟悉背压的其他人有用如果我对这个概念有误解,可能会在评论中得到更正.

什么是背压?

背压或消费者向生产者发出信号的能力 排放率太高-反应堆参考

当我们谈论背压时,我们必须将源/发布者分为两组:尊重订阅者需求的群体和忽略订阅者需求的群体.

热资源通常不尊重订阅者的需求,因为它们经常产生实时数据,例如收听Twitter提要.在此示例中,订户无法控制创建鸣叫的速率,因此很容易使其不知所措.

另一方面,当订阅发生时,冷源通常会按需生成数据,因此自然会尊重来自下游的请求.

重要的是,这不是一条规则:并非每个热源都忽略需求,也不是每个冷源都尊重需求.您可以在此处上阅读更多有关冷热资源的信息.. >

让我们看一些可能有助于理解的示例.

1.尊重需求的发布商

给出一个磁通,该磁通产生从1到Integer.MAX_VALUE的数字,并给出一个处理步骤,该处理步骤需要100毫秒来处理单个元素:

 Flux.range(1, Integer.MAX_VALUE)
    .log()
    .concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
    .blockLast();
 

让我们看看日志:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (parallel-1) | request(1)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-2) | request(1)
[ INFO] (parallel-2) | onNext(4)
[ INFO] (parallel-3) | request(1)
[ INFO] (parallel-3) | onNext(5)

我们可以看到,在每个onNext之前都有一个请求.请求信号由concatMap运算符发送.当concatMap完成实际元素并准备接受下一个元素时,会发出信号.源仅在收到来自下游的请求时才发送下一个项目.

在此示例中,背压是自动的,我们不需要定义任何策略,因为操作员知道它可以处理的内容,并且源代码会尊重它.

2.忽略需求且未定义背压策略的发布者

为简单起见,在此示例中,我选择了一个易于理解的冷发行商.它是 Flux.间隔.有道理的是,这个冷淡的发布者不尊重需求,因为看到以比最初指定的间隔更长的不同间隔发出的项目是很奇怪的.

让我们看看代码:

 Flux.interval(Duration.ofMillis(1))
    .log()
    .concatMap(x -> Mono.delay(Duration.ofMillis(100)))
    .blockLast();
 

源每毫秒发射一项.订户能够每100毫秒处理一项.显然,订户无法跟上制作人的步伐,我们很快就会收到类似这样的异常:

reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    ...

如何避免这种例外情况?

3.定义了忽略需求和背压策略的发布者

默认的背压策略是我们在上面看到的策略:因错误而终止. Reactor对我们不执行任何错误处理策略.当我们看到这种错误时,我们可以决定哪种错误最适合我们的用例.

您可以在反应堆参考中找到其中的几个.

在此示例中,我们将使用最简单的示例:onBackpressureDrop.

 Flux.interval(Duration.ofMillis(1))
    .onBackpressureDrop()
    .concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
    .doOnNext(a -> System.out.println("Element kept by consumer: " + a))
    .blockLast();
 

输出:

Element kept by consumer: 0
Element kept by consumer: 1
Element kept by consumer: 2
Element kept by consumer: 3
Element kept by consumer: 4
Element kept by consumer: 5
Element kept by consumer: 6
Element kept by consumer: 7
Element kept by consumer: 8
Element kept by consumer: 9
Element kept by consumer: 10
Element kept by consumer: 11
Element kept by consumer: 12
Element kept by consumer: 13
Element kept by consumer: 14
Element kept by consumer: 15
Element kept by consumer: 16
Element kept by consumer: 17
Element kept by consumer: 18
Element kept by consumer: 19
Element kept by consumer: 20
Element kept by consumer: 21
Element kept by consumer: 22
Element kept by consumer: 23
Element kept by consumer: 24
Element kept by consumer: 25
Element kept by consumer: 26
Element kept by consumer: 27
Element kept by consumer: 28
Element kept by consumer: 29
Element kept by consumer: 30
Element kept by consumer: 31
Element kept by consumer: 2399
Element kept by consumer: 2400
Element kept by consumer: 2401
Element kept by consumer: 2402
Element kept by consumer: 2403
Element kept by consumer: 2404
Element kept by consumer: 2405
Element kept by consumer: 2406
Element kept by consumer: 2407

我们可以看到,在前32个项目之后,跳到2400的幅度很大.由于定义了策略,因此删除了之间的元素.

要点

  • 反压通常是自动的,我们不需要做任何事情,因为我们可以按需获取数据.
  • 对于不符合订户需求的消息源,我们需要定义一种避免终止错误的策略.

更新: 有用的读物​​:如何控制请求速率

I've been working in Spring Reactor and had some previous testing that made me wonder how Fluxes handle backpressure by default. I know that onBackpressureBuffer and such exist, and I have also read that RxJava defaults to unbounded until you define whether to buffer, drop, etc.

So, can anyone clarify for me: What is the default backpressure behavior for a Flux in Reactor 3?

I tried searching for the answer but didn't find any clear answers, only definitions of Backpressure or that answer linked above for RxJava

解决方案

I will share what I know about this topic which might be too much for your question but I hope it would be useful for others who are just getting familiar with backpressure and I might get corrections in comments if I misunderstand something about this concept.

What is backpressure?

Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high - Reactor Reference

When we are talking about backpressure we have to separate sources/publishers into two groups: the ones that respect the demand from the subscriber, and those that ignore it.

Generally hot sources do not respect subscriber demand, since they often produce live data, like listening into a Twitter feed. In this example the subscriber doesn't have control over at what rate tweets are created, so it could get easily overwhelmed.

On the other hand a cold source usually generates data on demand when subscription happens, so it's natural that it respects the request from the downstream.

Important to note that this is not a rule: not every hot source ignores the demand and not every cold source respects it. You can read more on hot and cold sources here.

Let's look at some examples that might help in understanding.

1. Publisher that respects the demand

Given a Flux that produces numbers from 1 to Integer.MAX_VALUE and given a processing step that takes 100ms to process a single element:

Flux.range(1, Integer.MAX_VALUE)
    .log()
    .concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
    .blockLast();

Let's see the logs:

[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (parallel-1) | request(1)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-2) | request(1)
[ INFO] (parallel-2) | onNext(4)
[ INFO] (parallel-3) | request(1)
[ INFO] (parallel-3) | onNext(5)

We can see that before every onNext there is a request. The request signal is sent by concatMap operator. It is signaled when concatMap finished the actual element and ready to accept the next one. The source only sends the next item when it receives a request from the downstream.

In this example backpressure is automatic, we don't need to define any strategy because the operator knows what it can handle and the source respects it.

2. Publisher that ignores the demand and no backpressure strategy is defined

For the sake of simplicity I selected an easy to understand cold publisher for this example. It's Flux.interval. It makes sense that this cold publisher does not respect demand since it would be quite strange to see items emitted by different, longer intervals than the one originally specified.

Let's see the code:

Flux.interval(Duration.ofMillis(1))
    .log()
    .concatMap(x -> Mono.delay(Duration.ofMillis(100)))
    .blockLast();

Source emits one item every millisecond. Subscriber is able to process one item every 100 milliseconds. It's clear that the subscriber is not able to keep up with the producer and we get an exception something like this quite soon:

reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
    ...

What can we do to avoid this exception?

3. Publisher that ignores the demand and backpressure strategy is defined

The default backpressure strategy is the one we have seen above: terminating with error. Reactor does not enforce any error handling strategy on us. When we see this kind of error we can decide which one is the most applicable for our use case.

You can find a couple of them in Reactor reference.

For this example we will use the simplest one: onBackpressureDrop.

Flux.interval(Duration.ofMillis(1))
    .onBackpressureDrop()
    .concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
    .doOnNext(a -> System.out.println("Element kept by consumer: " + a))
    .blockLast();

Output:

Element kept by consumer: 0
Element kept by consumer: 1
Element kept by consumer: 2
Element kept by consumer: 3
Element kept by consumer: 4
Element kept by consumer: 5
Element kept by consumer: 6
Element kept by consumer: 7
Element kept by consumer: 8
Element kept by consumer: 9
Element kept by consumer: 10
Element kept by consumer: 11
Element kept by consumer: 12
Element kept by consumer: 13
Element kept by consumer: 14
Element kept by consumer: 15
Element kept by consumer: 16
Element kept by consumer: 17
Element kept by consumer: 18
Element kept by consumer: 19
Element kept by consumer: 20
Element kept by consumer: 21
Element kept by consumer: 22
Element kept by consumer: 23
Element kept by consumer: 24
Element kept by consumer: 25
Element kept by consumer: 26
Element kept by consumer: 27
Element kept by consumer: 28
Element kept by consumer: 29
Element kept by consumer: 30
Element kept by consumer: 31
Element kept by consumer: 2399
Element kept by consumer: 2400
Element kept by consumer: 2401
Element kept by consumer: 2402
Element kept by consumer: 2403
Element kept by consumer: 2404
Element kept by consumer: 2405
Element kept by consumer: 2406
Element kept by consumer: 2407

We can see that after the first 32 items there is a quite big skip to 2400. The elements between are dropped due to the defined strategy.

Key takeaways

  • Backpressure is often automatic and we don't need to do anything since we get data on demand.
  • In case of sources which do not respect subscriber demand we need to define a strategy to avoid terminating error.

UPDATE: Useful read: How to control request rate

这篇关于默认情况下,Spring Reactor中的订阅者是否不受限制?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆