如何从 Spring Cloud Stream 中 producer.send() 发送的异常中恢复 [英] How to recover from exceptions sent by producer.send() in Spring Cloud Stream
问题描述
我们经历了以下场景:
- 我们有一个由 3 个节点组成的 Kafka 集群,创建的每个主题有 3 个分区
- 通过
MessageChannel.send()
发送一条消息,为分区 1 生成一条记录 - 充当该分区的分区领导者的代理失败
默认情况下,MessageChannel.send()
返回 true
并且不会抛出任何异常,即使最终 KafkaProducer 无法成功发送消息.我们观察到,在此调用后约 30 秒,日志中出现以下消息:由于 30008 毫秒,helloworld-topic-1 已过期 10 条记录,加上延迟时间
>
在我们的例子中,这是不可接受的,因为我们必须确保所有消息最终都传递到 Kafka,在调用返回 MessageChannel.send()
的那一刻.
我们将 spring.cloud.stream.kafka.bindings.<channelName>.producer.sync
设为 true
,这与文档描述的完全一样.它阻止调用者对生产者对交付成功或失败的确认(MessageTimeoutException
、InterruptedException
、ExecutionException
),所有这些都受到控制通过 KafkaProducerMessageHandler
.这对我们来说似乎是最好的方法,因为在我们的情况下性能影响可以忽略不计.
但是,如果抛出异常,我们是否需要自己处理重试?(在我们的客户端代码中,例如 @Retryable
)
这是一个简单的实验项目:https://github.com/phdezann/spring-cloud-bus-kafka-helloworld
如果在 @StreamListener
线程上执行了 send()
并且将异常抛出回活页夹,活页夹重试配置将执行重试.
但是,由于您在 HTTP 线程上进行发送,因此您需要进行自己的重试(在 RetryTemplate()
范围内调用发送)或使控制器方法 @Retryable
.
We experienced the following scenario :
- We have a Kafka cluster composed of 3 nodes, each topic created has 3 partitions
- A message is sent through
MessageChannel.send()
, producing a record for, let's say, partition 1 - The broker acting as the partition leader for that partition fails
By default, MessageChannel.send()
returns true
and doesn't throw any exception, even if, eventually, the KafkaProducer can't send successfully the message. We observe, about 30 seconds after this call, the following message in the logs : Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time
In our case, this is not acceptable as we have to be sure that all messages are eventually delivered to Kafka, at the moment of the return of the call to MessageChannel.send()
.
We turned on spring.cloud.stream.kafka.bindings.<channelName>.producer.sync
to true
which does exactly as the documentation describes. It blocks the caller for the producer's acknowledgment of the success or the failure of the delivery (MessageTimeoutException
, InterruptedException
, ExecutionException
), all of this controlled by KafkaProducerMessageHandler
. It seems to be the best approach for us as the performance impact is negligible in our case.
But, do we need to take care of the retry ourselves if an exception is thrown ? (in our client code with @Retryable
for instance)
Here is a simple project to experiment : https://github.com/phdezann/spring-cloud-bus-kafka-helloworld
If the send()
is performed on the @StreamListener
thread and the exception is thrown back to the binder, the binder retry configuration will perform retries.
However, since you are doing the send on an HTTP thread you will need to do your own retry (call send within the scope of a RetryTemplate()
) or make the controller method @Retryable
.
这篇关于如何从 Spring Cloud Stream 中 producer.send() 发送的异常中恢复的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!