spring.cloud.stream.kafka.binder.headers 未按预期工作 [英] spring.cloud.stream.kafka.binder.headers not working as expected

查看:33
本文介绍了spring.cloud.stream.kafka.binder.headers 未按预期工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 spring.cloud.stream.kafka.binder.headers 来传输我根据之前的 问题.

我已阅读文档 哪里...

spring.cloud.stream.kafka.binder.headers将由绑定器传输的自定义标头列表.默认值:空.

似乎暗示设置列表(逗号分隔?)会导致自定义标头在 Message<> 中传输,但是一旦 kafka 写入完成,标头就会丢失.

我的注释创建标题作为对 MessagingGateway 调用的一部分:

@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name"))公共接口 RedemptionGateway {...}

我观察到在第一次 preSend 调试中正确创建了标头:

2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput',消息:GenericMessage [payload=x.TrivialRedemption@2d052d2a[orderId]=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f],headers={orderId=create,id=5dccea6f-266e-82b9-54c6-57ec441a26ac,timestamp=14712881444802}]:{clientSystemCode:00:0:0:0:1,clusterId=Cluster-Id-NA,containerId=Container-Id-NA,correlationId=UNDEFINED,domainName=defaultDomain,hostName=Host-NA,messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef,userId=匿名,webAnalyticsCorrelationId=|}

但是在下一次 preSend 时,标题丢失了:

2016-08-15 15:09:05 kafka-binder- DEBUG DirectChannel:430 - preSend on channel 'enrichingInput', message: GenericMessage [payload=x.TrivialRedemption@357bd4dd[orderId=f72b2d9b-4e43fa-95d4-1b0b368fe49f], headers={kafka_offset=10, orderId=create, kafka_messageKey=null, kafka_topic=received, kafka_partitionId=0, kafka_nextOffset=11, contentType=application/x-x-java-object};] - {}

我的属性包含:

<前>spring.cloud.stream.kafka.binder.headers=orderId

解决方案

你使用的是什么版本的 spring-cloud-stream?

我刚刚写了一个快速测试用例,它工作得很好...

spring.cloud.stream.kafka.binder.headers=barspring.cloud.stream.bindings.output.destination=foobarspring.cloud.stream.bindings.input.destination=foobarspring.cloud.stream.bindings.input.group=foo

应用:

package com.example;导入 org.springframework.beans.factory.annotation.Autowired;导入 org.springframework.boot.SpringApplication;导入 org.springframework.boot.autoconfigure.SpringBootApplication;导入 org.springframework.cloud.stream.annotation.EnableBinding;导入 org.springframework.cloud.stream.messaging.Processor;导入 org.springframework.context.ConfigurableApplicationContext;导入 org.springframework.context.annotation.Bean;导入 org.springframework.integration.support.MessageBuilder;导入 org.springframework.messaging.Message;导入 org.springframework.messaging.MessageHandler;导入 org.springframework.messaging.MessagingException;@SpringBootApplication@EnableBinding(Processor.class)公共类 So38961697Application {public static void main(String[] args) 抛出异常 {ConfigurableApplicationContext context = SpringApplication.run(So38961697Application.class, args);Foo foo = context.getBean(Foo.class);foo.start();foo.send();线程睡眠(30000);上下文.close();}@豆公共 Foo foo() {返回新的 Foo();}私有静态类 Foo {@自动连线处理器处理器;公共无效发送(){消息m = MessageBuilder.withPayload("foo").setHeader("bar", "baz").build();处理器.输出().发送(米);}公共无效开始(){this.processor.input().subscribe(new MessageHandler() {@覆盖public void handleMessage(Message m) 抛出 MessagingException {System.out.println(m);}});}}}

结果:

GenericMessage [payload=foo, headers={bar=baz, kafka_offset=0, kafka_messageKey=null, kafka_topic=foobar, kafka_partitionId=0, kafka_nextOffset=1, contentType=text/plain}]

完整的项目在这里.

查看评论,升级到 1.0.2.RELEASE 解决了问题

编辑

添加一个组,保证消费者从最早的消息开始消费.请参阅下面的评论.

I am trying to use spring.cloud.stream.kafka.binder.headers to transport a custom header that I am setting based upon a previous question.

I have read in the documentation where...

spring.cloud.stream.kafka.binder.headers
The list of custom headers that will be transported by the binder.

Default: empty.

seems to suggest that setting a list (comma separated?) will cause a custom header to get transported in the Message<>, but the header is lost as soon as the kafka write is completed.

My annotation creates the header as a part of the call to the MessagingGateway:

@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name"))
public interface RedemptionGateway {
    ...
}

I observe that the header is properly created in the first preSend debug:

2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [payload=x.TrivialRedemption@2d052d2a[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|}

But upon the next preSend, the header is missing:

2016-08-15 15:09:05 kafka-binder- DEBUG DirectChannel:430 - preSend on channel 'enrichingInput', message: GenericMessage [payload=x.TrivialRedemption@357bd4dd[orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={kafka_offset=10, orderId=create, kafka_messageKey=null, kafka_topic=received, kafka_partitionId=0, kafka_nextOffset=11, contentType=application/x-java-object;type=x.TrivialRedemption}] - {}

My properties contain:


    spring.cloud.stream.kafka.binder.headers=orderId

解决方案

What version of spring-cloud-stream are you using?

I just wrote a quick test case and it worked just fine...

spring.cloud.stream.kafka.binder.headers=bar
spring.cloud.stream.bindings.output.destination=foobar
spring.cloud.stream.bindings.input.destination=foobar
spring.cloud.stream.bindings.input.group=foo

App:

package com.example;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

@SpringBootApplication
@EnableBinding(Processor.class)
public class So38961697Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So38961697Application.class, args);
        Foo foo = context.getBean(Foo.class);
        foo.start();
        foo.send();
        Thread.sleep(30000);
        context.close();
    }

    @Bean
    public Foo foo() {
        return new Foo();
    }

    private static class Foo {

        @Autowired
        Processor processor;

        public void send() {
            Message<?> m = MessageBuilder.withPayload("foo")
                    .setHeader("bar", "baz")
                    .build();
            processor.output().send(m);
        }

        public void start() {
            this.processor.input().subscribe(new MessageHandler() {

                @Override
                public void handleMessage(Message<?> m) throws MessagingException {
                    System.out.println(m);
                }

            });
        }

    }

}

Result:

GenericMessage [payload=foo, headers={bar=baz, kafka_offset=0, kafka_messageKey=null, kafka_topic=foobar, kafka_partitionId=0, kafka_nextOffset=1, contentType=text/plain}]

The complete project is here.

Edit: See comment, upgrading to 1.0.2.RELEASE solved the issue

EDIT

Add a group to ensure the consumer consumes from the earliest message. See comment below.

这篇关于spring.cloud.stream.kafka.binder.headers 未按预期工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆