如何从 Spring Cloud Stream 中 producer.send() 发送的异常中恢复 [英] How to recover from exceptions sent by producer.send() in Spring Cloud Stream

查看:33
本文介绍了如何从 Spring Cloud Stream 中 producer.send() 发送的异常中恢复的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们经历了以下场景:

  • 我们有一个由 3 个节点组成的 Kafka 集群,创建的每个主题有 3 个分区
  • 通过 MessageChannel.send() 发送一条消息,为分区 1 生成一条记录
  • 充当该分区的分区领导者的代理失败

默认情况下,MessageChannel.send() 返回 true 并且不会抛出任何异常,即使最终 KafkaProducer 无法成功发送消息.我们观察到,在此调用后约 30 秒,日志中出现以下消息:由于 30008 毫秒,helloworld-topic-1 已过期 10 条记录,加上延迟时间>

在我们的例子中,这是不可接受的,因为我们必须确保所有消息最终都传递到 Kafka,在调用返回 MessageChannel.send() 的那一刻.

我们将 spring.cloud.stream.kafka.bindings.<channelName>.producer.sync 设为 true,这与文档描述的完全一样.它阻止调用者对生产者对交付成功或失败的确认(MessageTimeoutExceptionInterruptedExceptionExecutionException),所有这些都受到控制通过 KafkaProducerMessageHandler.这对我们来说似乎是最好的方法,因为在我们的情况下性能影响可以忽略不计.

但是,如果抛出异常,我们是否需要自己处理重试?(在我们的客户端代码中,例如 @Retryable)

这是一个简单的实验项目:https://github.com/phdezann/spring-cloud-bus-kafka-helloworld

解决方案

如果在 @StreamListener 线程上执行了 send() 并且将异常抛出回活页夹,活页夹重试配置将执行重试.

但是,由于您在 HTTP 线程上进行发送,因此您需要进行自己的重试(在 RetryTemplate() 范围内调用发送)或使控制器方法 @Retryable.

We experienced the following scenario :

  • We have a Kafka cluster composed of 3 nodes, each topic created has 3 partitions
  • A message is sent through MessageChannel.send(), producing a record for, let's say, partition 1
  • The broker acting as the partition leader for that partition fails

By default, MessageChannel.send() returns true and doesn't throw any exception, even if, eventually, the KafkaProducer can't send successfully the message. We observe, about 30 seconds after this call, the following message in the logs : Expiring 10 record(s) for helloworld-topic-1 due to 30008 ms has passed since batch creation plus linger time

In our case, this is not acceptable as we have to be sure that all messages are eventually delivered to Kafka, at the moment of the return of the call to MessageChannel.send().

We turned on spring.cloud.stream.kafka.bindings.<channelName>.producer.sync to true which does exactly as the documentation describes. It blocks the caller for the producer's acknowledgment of the success or the failure of the delivery (MessageTimeoutException, InterruptedException, ExecutionException), all of this controlled by KafkaProducerMessageHandler. It seems to be the best approach for us as the performance impact is negligible in our case.

But, do we need to take care of the retry ourselves if an exception is thrown ? (in our client code with @Retryable for instance)

Here is a simple project to experiment : https://github.com/phdezann/spring-cloud-bus-kafka-helloworld

解决方案

If the send() is performed on the @StreamListener thread and the exception is thrown back to the binder, the binder retry configuration will perform retries.

However, since you are doing the send on an HTTP thread you will need to do your own retry (call send within the scope of a RetryTemplate()) or make the controller method @Retryable.

这篇关于如何从 Spring Cloud Stream 中 producer.send() 发送的异常中恢复的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆