Kafka Spring 集成:标题不适合 kafka 消费者 [英] Kafka Spring Integration: Headers not coming for kafka consumer

查看:17
本文介绍了Kafka Spring 集成:标题不适合 kafka 消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Kafka Spring Integration 来使用 kafka 发布和使用消息.我看到有效负载正确地从生产者传递给消费者,但标题信息在某处被覆盖.

I am using Kafka Spring Integration for publishing and consuming messages using kafka. I see Payload is properly passed from producer to consumer, but the header information is getting overridden somewhere.

@ServiceActivator(inputChannel = "fromKafka")
public void processMessage(Message<?> message) throws InterruptedException,
        ExecutionException {
    try {
            System.out.println("Headers :" + message.getHeaders().toString());
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

我得到以下标题:

Headers :{timestamp=1440013920609, id=f8c645f7-677b-ec32-dad0-a7b79082ef81}

我正在像这样在生产者端构建消息:

I am constructing the message at producer end like this:

Message<FeelDBMessage> message = MessageBuilder
                .withPayload(samplePayloadObj)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "key")
                .setHeader(KafkaHeaders.TOPIC, "sampleTopic").build();

        // publish the message
        publisher.publishMessage(message);

以下是生产者的标题信息:

and below is the header info at producer:

 headers={timestamp=1440013914085, id=c4159c1c-2c67-634b-ef8d-3fb026b1172e, kafka_messageKey=key, kafka_topic=sampleTopic}

知道为什么标头会被不同的值覆盖吗?

Any idea why the Headers are overridden by a different value?

推荐答案

仅仅因为框架默认使用 immutable GenericMessage.

Just because by default Framework uses the immutable GenericMessage.

对现有消息的任何操作(例如 MessageBuilder.withPayload)都会产生一个新的 GenericMessage 实例.

Any manipulation to the existing message (e.g. MessageBuilder.withPayload) will produce a new GenericMessage instance.

另一方面,Kafka 不支持任何headers 抽象,如 JMS 或 AMQP.这就是为什么 KafkaProducerMessageHandler 在向 Kafka 发布消息时才这样做的原因:

From other side Kafka doesn't support any headers abstraction like JMS or AMQP. That's why KafkaProducerMessageHandler just do this when it publishes a message to Kafka:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());

如您所见,它根本不发送 headers.所以,另一方(消费者)只处理来自主题的 message 作为 payload 和一些系统选项作为 headerstopic, partition, messageKey.

As you see it doesn't send headers at all. So, other side (consumer) just deals with only message from the topic as a payload and some system options as headers like topic, partition, messageKey.

简而言之:我们不会通过 Kafka 传输标头,因为它不支持它们.

In two words: we don't transfer headers over Kafka because it doesn't support them.

这篇关于Kafka Spring 集成:标题不适合 kafka 消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆