手动确认消息:Spring Cloud Stream Kafka [英] Manual Acknowledgement of Messages : Spring Cloud Stream Kafka

查看:38
本文介绍了手动确认消息:Spring Cloud Stream Kafka的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想要实现的场景是消费来自 Kafka 的消息,处理它,如果某些条件失败,我不想确认该消息.为此,我在 spring 云流参考文档中找到了,

The scenario i want to implement is consume a message from Kafka , process it, if some condition fails i do not wish to acknowledge the message. For this i found in the spring cloud stream reference documentation,

自动提交偏移量处理消息后是否自动提交偏移量.如果设置为 false,则消息头中将提供一个确认头用于延迟确认.

autoCommitOffset Whether to autocommit offsets when a message has been processed. If set to false, an Acknowledgment header will be available in the message headers for late acknowledgment.

默认值:true.

我的问题是将 autoCommitOffset 设置为 false 后,我如何确认消息?非常感谢代码示例.

My question is after setting autoCommitOffset to false, how can i acknowledge a message? A Code example would be hugely appreciated.

推荐答案

我在这里提供了问题的答案 https://github.com/spring-cloud/spring-cloud-stream/issues/575

I've provided an answer to the question here https://github.com/spring-cloud/spring-cloud-stream/issues/575

基本上归结为设置<代码>spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

然后处理确认头:

@SpringBootApplication
@EnableBinding(Sink.class)
   public class ManuallyAcknowdledgingConsumer {

      public static void main(String[] args) {
         SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
      }

      @StreamListener(Sink.INPUT)
      public void process(Message<?> message) {
         System.out.println(message.getPayload());
         Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
        }
    }
}

这篇关于手动确认消息:Spring Cloud Stream Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆