手动确认消息:Spring Cloud Stream Kafka [英] Manual Acknowledgement of Messages : Spring Cloud Stream Kafka
问题描述
我要实现的方案是消耗来自Kafka的消息,对其进行处理,如果某些条件失败,我不希望确认该消息.为此,我在Spring Cloud Stream参考文档中找到了
The scenario i want to implement is consume a message from Kafka , process it, if some condition fails i do not wish to acknowledge the message. For this i found in the spring cloud stream reference documentation,
autoCommitOffset处理消息后是否自动提交偏移量.如果设置为false,则消息头中将提供一个Acknowledgment标头,以供以后确认.
autoCommitOffset Whether to autocommit offsets when a message has been processed. If set to false, an Acknowledgment header will be available in the message headers for late acknowledgment.
默认:true.
我的问题是将autoCommitOffset设置为false后,如何确认消息?一个代码示例将不胜感激.
My question is after setting autoCommitOffset to false, how can i acknowledge a message? A Code example would be hugely appreciated.
推荐答案
我在从本质上讲,它取决于设置spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset = false
Essentially it comes down to setting
spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false
,然后处理确认标头:
@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
System.out.println(message.getPayload());
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
这篇关于手动确认消息:Spring Cloud Stream Kafka的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!