使用 @SendTo 转发 spring @KafkaListener 结果 [英] Forwarding spring @KafkaListener result with @SendTo
问题描述
我正在创建一个消息处理器,它接收来自一个 kafka 主题的事件,对其进行处理并将结果转发到另一个主题.
我使用 @SendTo
创建了一个 @KafkaListener
方法,该方法可以正常工作,直到我想控制传出消息的密钥生成.
文档 (2.2.4.RELEASE) 建议通过子类化 KafkaTemplate
创建一个 bean,并覆盖它的 send(String topic, String data)
方法.
不幸的是,这不起作用,因为在我的情况下没有调用该方法.send(Message> message)
另一方面被调用,但这没有帮助.经过短暂的调试,结果证明 MessagingMessageListenerAdapter
调用此方法,以防输入是 org.springframework.messaging.Message
的实例而结果不是 List代码>.不幸的是,
RecordMessagingMessageListenerAdapter
总是将 input 转换为 Message
.我是否将这个注解组合用于不是 spring kafka 的作者的意图,这是一个错误还是文档错误?
此外,仅当我不创建自己的 KafkaTemplate
bean 时,spring boot 自动配置才起作用,这很烦人.如果我创建了覆盖的模板,那么我必须自己创建 KafkaListenerContainerFactory 并设置回复模板以使 @SendTo
再次工作.
这是我的示例代码.尽可能简单.
@SpringBootApplication@Slf4j公共类 SpringKafkaExampleApplication {公共静态无效主(字符串 [] args){SpringApplication.run(SpringKafkaExampleApplication.class, args);}@KafkaListener(topics = "${example.topics.input}")@SendTo("${example.topics.output}")公共字符串过程(最终字节[]有效载荷){String message = new String(payload, StandardCharsets.UTF_8);日志信息(消息);回消息;}/*//将我的自定义KafkaTemplate设置为replyTemplate@豆public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory(KafkaTemplate kafkaTemplate,ConsumerFactory消费者工厂) {ConcurrentKafkaListenerContainerFactory工厂 =新的 ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setReplyTemplate(kafkaTemplate);返厂;}//我的 KafkaTemplate 带有覆盖的 send(topic, data) 方法@豆公共 KafkaTemplatekafkaTempate(ProducerFactoryproducerFactory) {返回新的 KafkaTemplate(producerFactory) {@覆盖public ListenableFuture>发送(字符串主题,字符串数据){return super.send(topic, "some_generated_key", data);}};}*/}
更新
以 send(Message) 结尾的堆栈跟踪
send:215, KafkaTemplate (org.springframework.kafka.core)sendReplyForMessageSource:449, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)sendSingleResult:416, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)sendResponse:402, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)handleResult:324, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)onMessage:81, RecordMessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)onMessage:50, RecordMessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
RecordMessagingMessageListenerAdapter
这里将接收到的记录转换为 Message 对象.
@Overridepublic void onMessage(ConsumerRecord record, Acknowledgement Acknowledgement, Consumer, ?> consumer) {消息>message = toMessagingMessage(记录、确认、消费者);如果 (logger.isDebugEnabled()) {logger.debug("正在处理 [" + 消息 + "]");}试试{对象结果 = invokeHandler(记录、确认、消息、消费者);如果(结果!= null){处理结果(结果,记录,消息);}}
MessagingMessageListenerAdapter
String
由 KafkaListener 方法返回,因此 sendSingleResult(result, topic, source)
将被调用.
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {if (!messageReturnType && topic == null) {如果 (this.logger.isDebugEnabled()) {this.logger.debug("没有replyTopic来处理回复:" + result);}}else if (result instanceof Message) {this.replyTemplate.send((Message>) result);}其他{如果(结果实例集合){((Collection<V>) 结果).forEach(v -> {如果(v 消息实例){this.replyTemplate.send((Message>) v);}其他{this.replyTemplate.send(topic, v);}});}其他{sendSingleResult(result, topic, source);}}}
private void sendSingleResult(Object result, String topic, @Nullable Object source) {字节[]correlationId = null;boolean sourceIsMessage = 消息源实例;如果 (sourceIsMessage&&((Message>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID) != null) {correlationId = ((Message>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);}如果(sourceIsMessage){sendReplyForMessageSource(结果,主题,来源,correlationId);}其他{this.replyTemplate.send(topic, result);}}@SuppressWarnings("未选中")私有无效sendReplyForMessageSource(对象结果,字符串主题,对象源,字节[]相关ID){MessageBuilder<对象>builder = MessageBuilder.withPayload(result).setHeader(KafkaHeaders.TOPIC, 主题);如果(this.replyHeadersConfigurer != null){映射<字符串,对象>headersToCopy = ((Message>) source).getHeaders().entrySet().stream().filter(e -> {String key = e.getKey();返回 !key.equals(MessageHeaders.ID) &&!key.equals(MessageHeaders.TIMESTAMP)&&!key.equals(KafkaHeaders.CORRELATION_ID)&&!key.startsWith(KafkaHeaders.RECEIVED);}).filter(e -> this.replyHeadersConfigurer.shouldCopy(e.getKey(), e.getValue())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));如果 (headersToCopy.size() > 0) {builder.copyHeaders(headersToCopy);}headersToCopy = this.replyHeadersConfigurer.additionalHeaders();如果 (!ObjectUtils.isEmpty(headersToCopy)) {builder.copyHeaders(headersToCopy);}}如果(correlationId != null){builder.setHeader(KafkaHeaders.CORRELATION_ID,correlationId);}setPartition(builder, ((Message>) source));this.replyTemplate.send(builder.build());}
source
现在是一个 Message -> sendReplyForMessageSource
将被调用.
经过短暂的调试后发现,如果输入是
的实例,MessagingMessageListenerAdapter 会调用此方法org.springframework.messaging.Message
这是不正确的;当侦听器方法返回Message>
(或Collection
)时调用它.
代码:
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {if (!messageReturnType && topic == null) {如果 (this.logger.isDebugEnabled()) {this.logger.debug("没有replyTopic来处理回复:" + result);}}else if (result instanceof Message) {this.replyTemplate.send((Message>) result);}其他{如果(结果实例集合){((Collection<V>) 结果).forEach(v -> {如果(v 消息实例){this.replyTemplate.send((Message>) v);}其他{this.replyTemplate.send(topic, v);}});}其他{sendSingleResult(result, topic, source);}}}
自定义出站密钥的最简单方法是将方法更改为返回Message
.从该文档链接向下滚动到 ...
如果侦听器方法返回 Message 或 Collection>,则侦听器方法负责设置回复的消息头.例如,在处理来自 ReplyingKafkaTemplate 的请求时,您可以执行以下操作:
@KafkaListener(id = "messageReturned", topic = "someTopic")公共消息>听(字符串输入,@Header(KafkaHeaders.REPLY_TOPIC)字节 [] 回复,@Header(KafkaHeaders.CORRELATION_ID) byte[] 相关性) {返回 MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.TOPIC, 回复).setHeader(KafkaHeaders.MESSAGE_KEY, 42).setHeader(KafkaHeaders.CORRELATION_ID, 相关性).setHeader("someOtherHeader", "someValue").build();}
<块引用>
此外,仅当我不创建自己的 KafkaTemplate bean 时,spring boot 自动配置才起作用,这很烦人.
为了引导在回复模板中连接,它必须声明为 KafkaTemplate
.
I am creating a message processor that receives an event from one kafka topic, processes it and forwards the result to another topic.
I created a @KafkaListener
method with @SendTo
which works fine until I want to take control of the key generation of the outgoing message.
The documentation (2.2.4.RELEASE) suggest to create a bean by sub-classing KafkaTemplate
and override it's send(String topic, String data)
method.
Unfortunately this does not work because that method is not called in my case. send(Message<?> message)
is called on the other hand but that does not help. After a short debugging it turned out that MessagingMessageListenerAdapter
calls this method in case the input is instance of org.springframework.messaging.Message
and the result is not a List
. Unfortunately the RecordMessagingMessageListenerAdapter
always transforms the input to a Message
.
Am I using this annotation combination for something that was not the intention of the authors of spring kafka, is this a bug or is the documentation wrong?
Besides it is quite annoying that spring boot auto configuration works only if I don't create my own KafkaTemplate
bean.
If I create that overridden template then I have to create the KafkaListenerContainerFactory myself and set the replying template to make @SendTo
work again.
Here is my example code. As simple as it can be.
@SpringBootApplication
@Slf4j
public class SpringKafkaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaExampleApplication.class, args);
}
@KafkaListener(topics = "${example.topics.input}")
@SendTo("${example.topics.output}")
public String process(final byte[] payload) {
String message = new String(payload, StandardCharsets.UTF_8);
log.info(message);
return message;
}
/*
//To set my custom KafkaTemplate as replyTemplate
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(KafkaTemplate<String, String> kafkaTemplate,
ConsumerFactory<String, byte[]> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setReplyTemplate(kafkaTemplate);
return factory;
}
//My KafkaTemplate with overridden send(topic, data) method
@Bean
public KafkaTemplate<String, String> kafkaTempate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<String, String>(producerFactory) {
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, "some_generated_key", data);
}
};
}
*/
}
UPDATE
stack trace ending in send(Message)
send:215, KafkaTemplate (org.springframework.kafka.core)
sendReplyForMessageSource:449, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
sendSingleResult:416, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
sendResponse:402, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
handleResult:324, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
onMessage:81, RecordMessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
onMessage:50, RecordMessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
RecordMessagingMessageListenerAdapter
Here the received record is transformed to a Message object.
@Override
public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
Message<?> message = toMessagingMessage(record, acknowledgment, consumer);
if (logger.isDebugEnabled()) {
logger.debug("Processing [" + message + "]");
}
try {
Object result = invokeHandler(record, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, record, message);
}
}
MessagingMessageListenerAdapter
String
is returned by the KafkaListener method so sendSingleResult(result, topic, source)
will be called.
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
if (!messageReturnType && topic == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("No replyTopic to handle the reply: " + result);
}
}
else if (result instanceof Message) {
this.replyTemplate.send((Message<?>) result);
}
else {
if (result instanceof Collection) {
((Collection<V>) result).forEach(v -> {
if (v instanceof Message) {
this.replyTemplate.send((Message<?>) v);
}
else {
this.replyTemplate.send(topic, v);
}
});
}
else {
sendSingleResult(result, topic, source);
}
}
}
private void sendSingleResult(Object result, String topic, @Nullable Object source) {
byte[] correlationId = null;
boolean sourceIsMessage = source instanceof Message;
if (sourceIsMessage
&& ((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID) != null) {
correlationId = ((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
}
if (sourceIsMessage) {
sendReplyForMessageSource(result, topic, source, correlationId);
}
else {
this.replyTemplate.send(topic, result);
}
}
@SuppressWarnings("unchecked")
private void sendReplyForMessageSource(Object result, String topic, Object source, byte[] correlationId) {
MessageBuilder<Object> builder = MessageBuilder.withPayload(result)
.setHeader(KafkaHeaders.TOPIC, topic);
if (this.replyHeadersConfigurer != null) {
Map<String, Object> headersToCopy = ((Message<?>) source).getHeaders().entrySet().stream()
.filter(e -> {
String key = e.getKey();
return !key.equals(MessageHeaders.ID) && !key.equals(MessageHeaders.TIMESTAMP)
&& !key.equals(KafkaHeaders.CORRELATION_ID)
&& !key.startsWith(KafkaHeaders.RECEIVED);
})
.filter(e -> this.replyHeadersConfigurer.shouldCopy(e.getKey(), e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
if (headersToCopy.size() > 0) {
builder.copyHeaders(headersToCopy);
}
headersToCopy = this.replyHeadersConfigurer.additionalHeaders();
if (!ObjectUtils.isEmpty(headersToCopy)) {
builder.copyHeaders(headersToCopy);
}
}
if (correlationId != null) {
builder.setHeader(KafkaHeaders.CORRELATION_ID, correlationId);
}
setPartition(builder, ((Message<?>) source));
this.replyTemplate.send(builder.build());
}
source
is a Message now -> sendReplyForMessageSource
will be called.
After a short debugging it turned out that MessagingMessageListenerAdapter calls this method in case the input is instance of
org.springframework.messaging.Message
That is not correct; it's called when the listener method returns a Message<?>
(or Collection<Message<?>>
).
Code:
protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
if (!messageReturnType && topic == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("No replyTopic to handle the reply: " + result);
}
}
else if (result instanceof Message) {
this.replyTemplate.send((Message<?>) result);
}
else {
if (result instanceof Collection) {
((Collection<V>) result).forEach(v -> {
if (v instanceof Message) {
this.replyTemplate.send((Message<?>) v);
}
else {
this.replyTemplate.send(topic, v);
}
});
}
else {
sendSingleResult(result, topic, source);
}
}
}
The simplest way to customize the outbound key is to change your method to return Message<String>
. Scroll down from that documentation link to ...
If the listener method returns Message or Collection>, the listener method is responsible for setting up the message headers for the reply. For example, when handling a request from a ReplyingKafkaTemplate, you might do the following:
@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
@Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.setHeader("someOtherHeader", "someValue")
.build();
}
Besides it is quite annoying that spring boot auto configuration works only if I don't create my own KafkaTemplate bean.
In order for boot to wire in the reply template, it must be declared as KafkaTemplate<Object, Object>
.
这篇关于使用 @SendTo 转发 spring @KafkaListener 结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!