使用Spring Kafka添加自定义标头 [英] Adding custom header using Spring Kafka
问题描述
我打算使用Spring Kafka客户端从Spring Boot应用程序中的kafka设置中消费和产生消息.我在此处中看到了对Kafka 0.11中自定义标头的支持.虽然它可供本地Kafka生产者和消费者使用,但我看不到在Spring Kafka中添加/读取自定义标头的支持.
I am planning to use the Spring Kafka client to consume and produce messages from a kafka setup in a Spring Boot application. I see support for custom headers in Kafka 0.11 as detailed here. While it is available for native Kafka producers and consumers, I don't see support for adding/reading custom headers in Spring Kafka.
我正在尝试基于重发计数为消息实现DLQ,我希望将重试计数存储在消息头中,而不必解析有效负载.
I am trying to implement a DLQ for messages based on a retry count that I was hoping to store in the message header without having to parse the payload.
推荐答案
Spring Kafka自 version 2.0 开始提供标头支持:
Well, Spring Kafka provides headers support since version 2.0: https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html#headers
您可以拥有该 KafkaHeaderMapper
实例,并在通过 KafkaTemplate.send(Message<?>消息)发送它之前,使用它来填充
.或者,您可以使用普通的 Message
的标头. KafkaTemplate.send(ProducerRecord< K,V>记录)
.
You can have that KafkaHeaderMapper
instance and use it to populated headers to the Message
before sending it via KafkaTemplate.send(Message<?> message)
. Or you can use the plain KafkaTemplate.send(ProducerRecord<K, V> record)
.
当您使用 KafkaMessageListenerContainer
接收记录时,可以通过注入到 RecordMessagingMessageListenerAdapter
中的 MessagingMessageConverter
在其中提供 KafkaHeaderMapper
.>.
When you receive records using KafkaMessageListenerContainer
, the KafkaHeaderMapper
can be supplied there via a MessagingMessageConverter
injected to the RecordMessagingMessageListenerAdapter
.
因此,任何自定义标头都可以通过任何一种方式进行传输.
So, any custom headers can be transferred either way.
这篇关于使用Spring Kafka添加自定义标头的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!