Reactor 中的自动速率调整 [英] Automatic rate adjustment in Reactor

查看:38
本文介绍了Reactor 中的自动速率调整的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有没有办法根据下游健康状况自动调整 Project Reactor 中元素之间的延迟?

Is there a way to automatically adjust delay between elements in Project Reactor based on downstream health?

我有一个应用程序,它从 Kafka 主题读取记录,为每个记录发送一个 HTTP 请求,并将结果写入另一个 Kafka 主题.从/向 Kafka 读取和写入既快速又容易,但是第三方 HTTP 服务很容易被淹没,所以我使用 delayElements() 和一个属性文件中的值,这意味着这个值不在应用程序运行时更改.这是一个代码示例:

I have an application that reads records from Kafka topic, sends an HTTP request for each one of them and writes the result to another Kafka topic. Reading and writing from/to Kafka is fast and easy, but the third party HTTP service is easily overwhelmed, so I use delayElements() with a value from a property file, which means that this value does not change during application runtime. Here's a code sample:

kafkaReceiver.receiveAutoAck()
            .concatMap(identity())
            .delayElements(ofMillis(delayElement))
            .flatMap(message -> recordProcessingFunction.process(message.value()), messageRate)
            .onErrorContinue(handleError())
            .map(this::getSenderRecord)
            .flatMap(kafkaSender::send)

但是,第三方服务的加班时间可能会有所不同,我希望能够相应地调整此延迟.比方说,如果我看到超过 5% 的请求在 10 秒内失败,我会增加延迟.如果它在 10 秒内低于 5%,那么我会再次减少延迟.

However, the third party service might perform differently overtime and I'd like to be able to adjust this delay accordingly. Let's say, if I see that over 5% of requests fail over 10 second period, I would increase the delay. If it gets lower than 5% for over 10 sec, then I would reduce the delay again.

在 Reactor 中是否有现有的机制?我可以从我身边想到一些创造性的解决方案,但想知道他们(或其他人)是否已经实施了.

Is there an existing mechanism for that in Reactor? I can think of some creative solutions from my side, but was wondering if they (or someone else) already implemented that.

推荐答案

您可以添加带指数退避的重试.像这样:

You can add a retry with exponential backoff. Somethign like this:

influx()
.flatMap(x -> Mono.just(x)
    .map(data -> apiCall(data))
    .retryWhen(
            Retry.backoff(Integet.MAX_VALUE, Duration.ofSeconds(30))
                .filter(err -> err instanceof RuntimeException)
                .doBeforeRetry(
                    s -> log.warn("Retrying for err {}", s.failure().getMessage()))
                .onRetryExhaustedThrow((spec, sig) -> new RuntimeException("ex")))
                .onErrorResume(err -> Mono.empty()),
        concurrency_val,
        prefetch_val)

这将重试失败的请求 Integet.MAX_VALUE 次,每次重试之间的最短时间为 30 秒.随后的重试实际上被一个可配置的抖动因子(默认值 = 0.5)抵消,导致连续重试之间的持续时间增加.

This will retry the failed request Integet.MAX_VALUE times with minimum time of 30s between each retry. The subsequent retries are actually offset by a configurable jitter factor (default value = 0.5) causing the duration to increase between successive retries.

关于 Retry.backoff 的文档说:

为具有抖动的指数退避策略预配置的 RetryBackoffSpec,给定最大重试尝试次数和退避的最短持续时间.

A RetryBackoffSpec preconfigured for exponential backoff strategy with jitter, given a maximum number of retry attempts and a minimum Duration for the backoff.

此外,由于整个操作都映射在 flatMap 中,您可以更改默认的 concurrencyprefetch 值,以便考虑在整个管道等待 RetryBackOffSpec 成功完成时,在任何给定时间可能失败的最大请求数.

Also, since the whole operation is mapped in flatMap, you can vary the default concurrency and prefetch values for it in order to account for the maximum number of requests that can fail at any given time while the whole pipeline waits for the RetryBackOffSpec to complete successfully.

最坏的情况是,您的 concurrency_val 请求失败并等待 30 秒以上的重试发生.整个操作可能会停止(仍在等待下游成功),如果下游系统没有及时恢复,这可能是不可取的.最好将 Integer.MAX_VALUE 中的 backOff 限​​制替换为可管理的值,超出它只会记录错误并继续下一个事件.

Worst case scenario, your concurrency_val number of requests have failed and waiting for 30+ seconds for the retry to happen. The whole operation might halt down (still waiting for success from downstream) which may not be desirable if the downstream system don't recover in time. Better to replace backOff limit from Integer.MAX_VALUE to something managable beyond which it would just log the error and proceed with next event.

这篇关于Reactor 中的自动速率调整的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆