使用Spring Kafka添加自定义标头 [英] Adding custom header using Spring Kafka

查看:87
本文介绍了使用Spring Kafka添加自定义标头的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我打算使用Spring Kafka客户端从Spring Boot应用程序中的kafka设置中消费和产生消息.我在此处中看到了对Kafka 0.11中自定义标头的支持.虽然它可供本地Kafka生产者和消费者使用,但我看不到在Spring Kafka中添加/读取自定义标头的支持.

I am planning to use the Spring Kafka client to consume and produce messages from a kafka setup in a Spring Boot application. I see support for custom headers in Kafka 0.11 as detailed here. While it is available for native Kafka producers and consumers, I don't see support for adding/reading custom headers in Spring Kafka.

我正在尝试基于重发计数为消息实现DLQ,我希望将重试计数存储在消息头中,而不必解析有效负载.

I am trying to implement a DLQ for messages based on a retry count that I was hoping to store in the message header without having to parse the payload.

推荐答案

Spring Kafka自 version 2.0 开始提供标头支持:

Well, Spring Kafka provides headers support since version 2.0: https://docs.spring.io/spring-kafka/docs/2.1.2.RELEASE/reference/html/_reference.html#headers

您可以拥有该 KafkaHeaderMapper 实例,并在通过 KafkaTemplate.send(Message<?>消息)发送它之前,使用它来填充 Message 的标头..或者,您可以使用普通的 KafkaTemplate.send(ProducerRecord< K,V>记录).

You can have that KafkaHeaderMapper instance and use it to populated headers to the Message before sending it via KafkaTemplate.send(Message<?> message). Or you can use the plain KafkaTemplate.send(ProducerRecord<K, V> record).

当您使用 KafkaMessageListenerContainer 接收记录时,可以通过注入到 RecordMessagingMessageListenerAdapter 中的 MessagingMessageConverter 在其中提供 KafkaHeaderMapper .>.

When you receive records using KafkaMessageListenerContainer, the KafkaHeaderMapper can be supplied there via a MessagingMessageConverter injected to the RecordMessagingMessageListenerAdapter.

因此,任何自定义标头都可以通过任何一种方式进行传输.

So, any custom headers can be transferred either way.

这篇关于使用Spring Kafka添加自定义标头的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆