Spring Web-Flux中的背压机制 [英] Backpressure mechanism in Spring Web-Flux

查看:442
本文介绍了Spring Web-Flux中的背压机制的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spring Web-Flux 的入门者.我编写了一个控制器,如下所示:

I'm a starter in Spring Web-Flux. I wrote a controller as follows:

@RestController
public class FirstController 
{
    @GetMapping("/first")
    public Mono<String> getAllTweets() 
    {
        return Mono.just("I am First Mono")
    }
}

我知道被动的好处之一是背压,它可以平衡请求或响应速度.我想了解如何在 Spring Web-Flux 中使用反压机制.

I know one of the reactive benefits is Backpressure, and it can balance the request or the response rate. I want to realize how to have backpressure mechanism in Spring Web-Flux.

推荐答案

WebFlux中的反压

为了了解Backpressure在WebFlux框架的当前实现中是如何工作的,我们必须在此处重述默认情况下使用的传输层.我们可能还记得,浏览器和服务器之间的正常通信(服务器之间的通信通常也相同)是通过TCP连接完成的. WebFlux还使用该传输方式在客户端和服务器之间进行通信. 然后,为了获得 backpressure control 术语的含义,我们必须从Reactive Streams规范角度回顾一下backpressure的含义.

Backpressure in WebFlux

In order to understand how Backpressure works in the current implementation of the WebFlux framework, we have to recap the transport layer used by default here. As we may remember, the normal communication between browser and server (server to server communication usually the same as well) is done through the TCP connection. WebFlux also uses that transport for communication between a client and the server. Then, in order to get the meaning of the backpressure control term, we have to recap what backpressure means from the Reactive Streams specification perspective.

基本语义定义了如何通过背压调节流元素的传输.

The basic semantics define how the transmission of stream elements is regulated through back-pressure.

因此,从该陈述中,我们可以得出结论,在反应流中,背压是一种通过传输(通知)接收者可以消耗多少元素来调节需求的机制.在这里,我们有一个棘手的问题. TCP具有字节抽象,而不是逻辑元素抽象.我们通常希望通过说背压控制来控制发送到网络或从网络接收的逻辑元素的数量.即使TCP具有自己的流控制(请参见此处和动画),此流控制仍然是针对字节而不是逻辑的元素.

So, from that statement, we may conclude that in Reactive Streams the backpressure is a mechanism that regulates the demand through the transmission (notification) of how many elements recipient can consume; And here we have a tricky point. The TCP has a bytes abstraction rather than logical elements abstraction. What we usually want by saying backpressure control is the control of the number of logical elements sent/received to/from the network. Even though the TCP has its own flow control (see the meaning here and animation there) this flow control is still for bytes rather than for logical elements.

在WebFlux模块的当前实现中,背压由传输流控制来调节,但它并未暴露出接收者的实际需求.为了最终看到交互流程,请参见下图:

In the current implementation of the WebFlux module, the backpressure is regulated by the transport flow control, but it does not expose the real demand of the recipient. In order to finally see the interaction flow, please see the following diagram:

为简单起见,上图显示了两个微服务之间的通信,其中左一个发送数据流,而右一个使用该流.以下编号列表提供了对该图的简要说明:

For simplicity, the above diagram shows the communication between two microservices where the left one sends streams of data, and the right one consumes that stream. The following numbered list provides a brief explanation of that diagram:

  1. 这是WebFlux框架,可以适当地将逻辑元素转换为字节,然后将其返回并从TCP(网络)传输/接收.
  2. 这是元素长期运行的开始,一旦作业完成,该元素就会请求下一个元素.
  3. 在这里,尽管没有业务逻辑的需求,但是WebFlux排队来自网络的字节而没有得到确认(业务逻辑没有需求).
  4. 由于TCP流量控制的性质,服务A仍可以将数据发送到网络.

从上图中我们可以注意到,接收者暴露的需求与发送者的需求不同(此处以逻辑元素的需求).这意味着两者的需求是隔离的,并且仅对WebFlux<->业务逻辑(Service)交互有效,并且对于Service A<-> Service B交互而言,暴露的背压较小.

As we may notice from the diagram above, the demand exposed by the recipient is different from the demand of the sender (demand here in logical elements). It means that the demand of both is isolated and works only for WebFlux <-> Business logic (Service) interaction and exposes less the backpressure for Service A <-> Service B interaction.

所有这些都意味着WebFlux中的背压控制并不像我们期望的那样公平.

All that means that the backpressure control is not that fair in WebFlux as we expect.

如果我们仍然希望对WebFlux中的反压进行不公平的控制,则可以在Project Reactor运算符(例如

If we still want to have an unfair control of backpressure in WebFlux, we may do that with the support of Project Reactor operators such as limitRate(). The following example shows how we may use that operator:

@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {

    return tweetService.process(tweetsFlux.limitRate(10))
                       .then();
}

从示例中可以看出,

运算符limitRate()允许定义一次要预取的元素数量.这意味着即使最终订户请求Long.MAX_VALUE元素,limitRate运算符也会将该需求拆分为多个块,并且不允许一次消耗更多.我们可以对元素发送过程做同样的事情:

As we may see from the example, limitRate() operator allows defining the number of elements to be prefetched at once. That means that even if the final subscriber requests Long.MAX_VALUE elements, the limitRate operator split that demand into chunks and does not allow to consume more than that at once. The same we may do with elements sending process:

@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {

    return tweetService.retreiveAll()
                       .limitRate(10);
}

以上示例显示,即使WebFlux一次请求的元素数量超过10个,limitRate()也会将需求限制为预取大小,并防止一次消耗超过指定数量的元素.

The above example shows that even if WebFlux requests more then 10 elements at a time, the limitRate() throttles the demand to the prefetch size and prevents to consume more than the specified number of elements at once.

另一个选择是实现自己的Subscriber或从Project Reactor扩展BaseSubscriber.例如,以下是我们如何做到这一点的幼稚示例:

Another option is to implement own Subscriber or extend the BaseSubscriber from Project Reactor. For instance, The following is a naive example of how we may do that:

class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {

    int consumed;
    final int limit = 5;

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        request(limit);
    }

    @Override
    protected void hookOnNext(T value) {
        // do business logic there 

        consumed++;

        if (consumed == limit) {
            consumed = 0;

            request(limit);
        }
    }
}

使用RSocket协议的合理背压

为了通过网络边界实现逻辑元素背压,我们需要适当的协议.幸运的是,有一个名为 RScoket协议. RSocket是一种应用程序级别的协议,允许通过网络边界传输实际需求. 该协议有一个RSocket-Java实现,可用于设置RSocket服务器.对于服务器到服务器的通信,相同的RSocket-Java库也提供了客户端实现.要了解有关如何使用RSocket-Java的更多信息,请参见以下示例 RSocket-JS 实现,该实现允许在浏览器之间连接流通信和服务器通过WebSocket.

Fair backpressure with RSocket Protocol

In order to achieve logical-elements backpressure through the network boundaries, we need an appropriate protocol for that. Fortunately, there is one called RScoket protocol. RSocket is an application-level protocol that allows transferring real demand through the network boundaries. There is an RSocket-Java implementation of that protocol that allows to set up an RSocket server. In the case of a server to server communication, the same RSocket-Java library provides a client implementation as well. To learn more how to use RSocket-Java, please see the following examples here. For browser-server communication, there is an RSocket-JS implementation which allows wiring the streaming communication between browser and server through WebSocket.

如今,有一些框架是基于RSocket协议构建的.

Nowadays there are a few frameworks, built on top of the RSocket protocol.

其中一个框架是Proteus项目,该项目提供在RSocket之上构建的成熟的微服务.而且,Proteus与Spring框架很好地集成在一起,因此现在我们可以实现合理的背压控制(请参见示例)

One of the frameworks is a Proteus project which offers full-fledged microservices built on top of RSocket. Also, Proteus is well integrated with Spring framework so now we may achieve a fair backpressure control (see examples there)

  • https://www.netifi.com/proteus
  • https://medium.com/netifi
  • http://scalecube.io/

这篇关于Spring Web-Flux中的背压机制的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆