使用 @SendTo 转发 spring @KafkaListener 结果 [英] Forwarding spring @KafkaListener result with @SendTo

查看:37
本文介绍了使用 @SendTo 转发 spring @KafkaListener 结果的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在创建一个消息处理器,它接收来自一个 kafka 主题的事件,对其进行处理并将结果转发到另一个主题.

我使用 @SendTo 创建了一个 @KafkaListener 方法,该方法可以正常工作,直到我想控制传出消息的密钥生成.

文档 (2.2.4.RELEASE) 建议通过子类化 KafkaTemplate 创建一个 bean,并覆盖它的 send(String topic, String data) 方法.

不幸的是,这不起作用,因为在我的情况下没有调用该方法.send(Message message) 另一方面被调用,但这没有帮助.经过短暂的调试,结果证明 MessagingMessageListenerAdapter 调用此方法,以防输入是 org.springframework.messaging.Message 的实例而结果不是 List.不幸的是,RecordMessagingMessageListenerAdapter 总是将 input 转换为 Message.我是否将这个注解组合用于不是 spring kafka 的作者的意图,这是一个错误还是文档错误?

此外,仅当我不创建自己的 KafkaTemplate bean 时,spring boot 自动配置才起作用,这很烦人.如果我创建了覆盖的模板,那么我必须自己创建 KafkaListenerContainerFactory 并设置回复模板以使 @SendTo 再次工作.

这是我的示例代码.尽可能简单.

@SpringBootApplication@Slf4j公共类 SpringKafkaExampleApplication {公共静态无效主(字符串 [] args){SpringApplication.run(SpringKafkaExampleApplication.class, args);}@KafkaListener(topics = "${example.topics.input}")@SendTo("${example.topics.output}")公共字符串过程(最终字节[]有效载荷){String message = new String(payload, StandardCharsets.UTF_8);日志信息(消息);回消息;}/*//将我的自定义KafkaTemplate设置为replyTemplate@豆public ConcurrentKafkaListenerContainerFactorykafkaListenerContainerFactory(KafkaTemplate kafkaTemplate,ConsumerFactory消费者工厂) {ConcurrentKafkaListenerContainerFactory工厂 =新的 ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.setReplyTemplate(kafkaTemplate);返厂;}//我的 KafkaTemplate 带有覆盖的 send(topic, data) 方法@豆公共 KafkaTemplatekafkaTempate(ProducerFactoryproducerFactory) {返回新的 KafkaTemplate(producerFactory) {@覆盖public ListenableFuture>发送(字符串主题,字符串数据){return super.send(topic, "some_generated_key", data);}};}*/}

更新

以 send(Message) 结尾的堆栈跟踪

send:215, KafkaTemplate (org.springframework.kafka.core)sendReplyForMessageSource:449, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)sendSingleResult:416, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)sendResponse:402, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)handleResult:324, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)onMessage:81, RecordMessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)onMessage:50, RecordMessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)

RecordMessagingMessageListenerAdapter

这里将接收到的记录转换为 Message 对象.

 @Overridepublic void onMessage(ConsumerRecord record, Acknowledgement Acknowledgement, Consumer consumer) {消息message = toMessagingMessage(记录、确认、消费者);如果 (logger.isDebugEnabled()) {logger.debug("正在处理 [" + 消息 + "]");}试试{对象结果 = invokeHandler(记录、确认、消息、消费者);如果(结果!= null){处理结果(结果,记录,消息);}}

MessagingMessageListenerAdapter

String 由 KafkaListener 方法返回,因此 sendSingleResult(result, topic, source) 将被调用.

 protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {if (!messageReturnType && topic == null) {如果 (this.logger.isDebugEnabled()) {this.logger.debug("没有replyTopic来处理回复:" + result);}}else if (result instanceof Message) {this.replyTemplate.send((Message) result);}其他{如果(结果实例集合){((Collection<V>) 结果).forEach(v -> {如果(v 消息实例){this.replyTemplate.send((Message) v);}其他{this.replyTemplate.send(topic, v);}});}其他{sendSingleResult(result, topic, source);}}}

 private void sendSingleResult(Object result, String topic, @Nullable Object source) {字节[]correlationId = null;boolean sourceIsMessage = 消息源实例;如果 (sourceIsMessage&&((Message) source).getHeaders().get(KafkaHeaders.CORRELATION_ID) != null) {correlationId = ((Message) source).getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);}如果(sourceIsMessage){sendReplyForMessageSource(结果,主题,来源,correlationId);}其他{this.replyTemplate.send(topic, result);}}@SuppressWarnings("未选中")私有无效sendReplyForMessageSource(对象结果,字符串主题,对象源,字节[]相关ID){MessageBuilder<对象>builder = MessageBuilder.withPayload(result).setHeader(KafkaHeaders.TOPIC, 主题);如果(this.replyHeadersConfigurer != null){映射<字符串,对象>headersToCopy = ((Message) source).getHeaders().entrySet().stream().filter(e -> {String key = e.getKey();返回 !key.equals(MessageHeaders.ID) &&!key.equals(MessageHeaders.TIMESTAMP)&&!key.equals(KafkaHeaders.CORRELATION_ID)&&!key.startsWith(KafkaHeaders.RECEIVED);}).filter(e -> this.replyHeadersConfigurer.shouldCopy(e.getKey(), e.getValue())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));如果 (headersToCopy.size() > 0) {builder.copyHeaders(headersToCopy);}headersToCopy = this.replyHeadersConfigurer.additionalHeaders();如果 (!ObjectUtils.isEmpty(headersToCopy)) {builder.copyHeaders(headersToCopy);}}如果(correlationId != null){builder.setHeader(KafkaHeaders.CORRELATION_ID,correlationId);}setPartition(builder, ((Message) source));this.replyTemplate.send(builder.build());}

source 现在是一个 Message -> sendReplyForMessageSource 将被调用.

解决方案

经过短暂的调试后发现,如果输入是 org.springframework.messaging.Message

的实例,MessagingMessageListenerAdapter 会调用此方法

这是不正确的;当侦听器方法返回Message(或Collection)时调用它.

代码:

protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {if (!messageReturnType && topic == null) {如果 (this.logger.isDebugEnabled()) {this.logger.debug("没有replyTopic来处理回复:" + result);}}else if (result instanceof Message) {this.replyTemplate.send((Message) result);}其他{如果(结果实例集合){((Collection<V>) 结果).forEach(v -> {如果(v 消息实例){this.replyTemplate.send((Message) v);}其他{this.replyTemplate.send(topic, v);}});}其他{sendSingleResult(result, topic, source);}}}

自定义出站密钥的最简单方法是将方法更改为返回Message.从该文档链接向下滚动到 ...

<块引用>

如果侦听器方法返回 Message 或 Collection>,则侦听器方法负责设置回复的消息头.例如,在处理来自 ReplyingKafkaTemplate 的请求时,您可以执行以下操作:

@KafkaListener(id = "messageReturned", topic = "someTopic")公共消息听(字符串输入,@Header(KafkaHeaders.REPLY_TOPIC)字节 [] 回复,@Header(KafkaHeaders.CORRELATION_ID) byte[] 相关性) {返回 MessageBuilder.withPayload(in.toUpperCase()).setHeader(KafkaHeaders.TOPIC, 回复).setHeader(KafkaHeaders.MESSAGE_KEY, 42).setHeader(KafkaHeaders.CORRELATION_ID, 相关性).setHeader("someOtherHeader", "someValue").build();}

<块引用>

此外,仅当我不创建自己的 KafkaTemplate bean 时,spring boot 自动配置才起作用,这很烦人.

为了引导在回复模板中连接,它必须声明为 KafkaTemplate.

I am creating a message processor that receives an event from one kafka topic, processes it and forwards the result to another topic.

I created a @KafkaListener method with @SendTo which works fine until I want to take control of the key generation of the outgoing message.

The documentation (2.2.4.RELEASE) suggest to create a bean by sub-classing KafkaTemplate and override it's send(String topic, String data) method.

Unfortunately this does not work because that method is not called in my case. send(Message<?> message) is called on the other hand but that does not help. After a short debugging it turned out that MessagingMessageListenerAdapter calls this method in case the input is instance of org.springframework.messaging.Message and the result is not a List. Unfortunately the RecordMessagingMessageListenerAdapter always transforms the input to a Message. Am I using this annotation combination for something that was not the intention of the authors of spring kafka, is this a bug or is the documentation wrong?

Besides it is quite annoying that spring boot auto configuration works only if I don't create my own KafkaTemplate bean. If I create that overridden template then I have to create the KafkaListenerContainerFactory myself and set the replying template to make @SendTo work again.

Here is my example code. As simple as it can be.

@SpringBootApplication
@Slf4j
public class SpringKafkaExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaExampleApplication.class, args);
    }

    @KafkaListener(topics = "${example.topics.input}")
    @SendTo("${example.topics.output}")
    public String process(final byte[] payload) {
        String message = new String(payload, StandardCharsets.UTF_8);
        log.info(message);
        return message;
    }

/*
    //To set my custom KafkaTemplate as replyTemplate
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory(KafkaTemplate<String, String> kafkaTemplate,
                                                                                                 ConsumerFactory<String, byte[]> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setReplyTemplate(kafkaTemplate);
        return factory;
    }

    //My KafkaTemplate with overridden send(topic, data) method
    @Bean
    public KafkaTemplate<String, String> kafkaTempate(ProducerFactory<String, String> producerFactory) {
        return new KafkaTemplate<String, String>(producerFactory) {

            @Override
            public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
                return super.send(topic, "some_generated_key", data);
            }
        };
    }
    */
}

UPDATE

stack trace ending in send(Message)

send:215, KafkaTemplate (org.springframework.kafka.core)
sendReplyForMessageSource:449, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
sendSingleResult:416, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
sendResponse:402, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
handleResult:324, MessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
onMessage:81, RecordMessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)
onMessage:50, RecordMessagingMessageListenerAdapter (org.springframework.kafka.listener.adapter)

RecordMessagingMessageListenerAdapter

Here the received record is transformed to a Message object.

    @Override
    public void onMessage(ConsumerRecord<K, V> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
        Message<?> message = toMessagingMessage(record, acknowledgment, consumer);
        if (logger.isDebugEnabled()) {
            logger.debug("Processing [" + message + "]");
        }
        try {
            Object result = invokeHandler(record, acknowledgment, message, consumer);
            if (result != null) {
                handleResult(result, record, message);
            }
        }

MessagingMessageListenerAdapter

String is returned by the KafkaListener method so sendSingleResult(result, topic, source) will be called.

    protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
        if (!messageReturnType && topic == null) {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("No replyTopic to handle the reply: " + result);
            }
        }
        else if (result instanceof Message) {
            this.replyTemplate.send((Message<?>) result);
        }
        else {
            if (result instanceof Collection) {
                ((Collection<V>) result).forEach(v -> {
                    if (v instanceof Message) {
                        this.replyTemplate.send((Message<?>) v);
                    }
                    else {
                        this.replyTemplate.send(topic, v);
                    }
                });
            }
            else {
                sendSingleResult(result, topic, source);
            }
        }
    }

    private void sendSingleResult(Object result, String topic, @Nullable Object source) {
        byte[] correlationId = null;
        boolean sourceIsMessage = source instanceof Message;
        if (sourceIsMessage
                && ((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID) != null) {
            correlationId = ((Message<?>) source).getHeaders().get(KafkaHeaders.CORRELATION_ID, byte[].class);
        }
        if (sourceIsMessage) {
            sendReplyForMessageSource(result, topic, source, correlationId);
        }
        else {
            this.replyTemplate.send(topic, result);
        }
    }

    @SuppressWarnings("unchecked")
    private void sendReplyForMessageSource(Object result, String topic, Object source, byte[] correlationId) {
        MessageBuilder<Object> builder = MessageBuilder.withPayload(result)
                .setHeader(KafkaHeaders.TOPIC, topic);
        if (this.replyHeadersConfigurer != null) {
            Map<String, Object> headersToCopy = ((Message<?>) source).getHeaders().entrySet().stream()
                .filter(e -> {
                    String key = e.getKey();
                    return !key.equals(MessageHeaders.ID) && !key.equals(MessageHeaders.TIMESTAMP)
                            && !key.equals(KafkaHeaders.CORRELATION_ID)
                            && !key.startsWith(KafkaHeaders.RECEIVED);
                })
                .filter(e -> this.replyHeadersConfigurer.shouldCopy(e.getKey(), e.getValue()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            if (headersToCopy.size() > 0) {
                builder.copyHeaders(headersToCopy);
            }
            headersToCopy = this.replyHeadersConfigurer.additionalHeaders();
            if (!ObjectUtils.isEmpty(headersToCopy)) {
                builder.copyHeaders(headersToCopy);
            }
        }
        if (correlationId != null) {
            builder.setHeader(KafkaHeaders.CORRELATION_ID, correlationId);
        }
        setPartition(builder, ((Message<?>) source));
        this.replyTemplate.send(builder.build());
    }

source is a Message now -> sendReplyForMessageSource will be called.

解决方案

After a short debugging it turned out that MessagingMessageListenerAdapter calls this method in case the input is instance of org.springframework.messaging.Message

That is not correct; it's called when the listener method returns a Message<?> (or Collection<Message<?>>).

Code:

protected void sendResponse(Object result, String topic, @Nullable Object source, boolean messageReturnType) {
    if (!messageReturnType && topic == null) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("No replyTopic to handle the reply: " + result);
        }
    }
    else if (result instanceof Message) {
        this.replyTemplate.send((Message<?>) result);
    }
    else {
        if (result instanceof Collection) {
            ((Collection<V>) result).forEach(v -> {
                if (v instanceof Message) {
                    this.replyTemplate.send((Message<?>) v);
                }
                else {
                    this.replyTemplate.send(topic, v);
                }
            });
        }
        else {
            sendSingleResult(result, topic, source);
        }
    }
}

The simplest way to customize the outbound key is to change your method to return Message<String>. Scroll down from that documentation link to ...

If the listener method returns Message or Collection>, the listener method is responsible for setting up the message headers for the reply. For example, when handling a request from a ReplyingKafkaTemplate, you might do the following:

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {

    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.MESSAGE_KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

Besides it is quite annoying that spring boot auto configuration works only if I don't create my own KafkaTemplate bean.

In order for boot to wire in the reply template, it must be declared as KafkaTemplate<Object, Object>.

这篇关于使用 @SendTo 转发 spring @KafkaListener 结果的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆