Spring Kafka Consumer 将消息作为 LinkedHashMap 消费,因此自动将 BigDecimal 转换为 double [英] Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double

查看:36
本文介绍了Spring Kafka Consumer 将消息作为 LinkedHashMap 消费,因此自动将 BigDecimal 转换为 double的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用基于注解的 spring kafka 监听器来消费 kafka 消息,代码如下

I am using annotation based spring kafka listener to consume the kafka messages, and code is as below

  1. 使用员工对象

Class Employee{
private String name;
private String address;
private Object account;
//getters
//setters
}

  1. Account 对象在运行时决定它是 Saving Account 还是 Current Account 等.

Class SavingAcc{
private BigDecimal balance;

}
Class CurrentAcc{
private BigDecimal balance;
private BigDecimal limit;
}

  1. 储蓄&当前帐户具有 BigDecimal 字段来存储余额.
  2. 因此,当从 Kafka 生产者发送 Employee 对象时,所有字段都被正确映射并以正确的 BigDecimal 等格式出现.
  3. 但是在另一个服务中使用 Employee 对象时,帐户对象显示为 LinkedHashMap,而 BigDecimal 字段被转换为 Double.这会导致问题.
  4. 据我了解,主要原因可能是a) 将帐户声明为对象类型而不是特定类型b) 或者应该提供更具体的解串器.[我已经将 Employee.class 作为 kafka 接收器反序列化器的类型,因此 Employee 字段被正确映射但帐户字段错误].

@Bean
public ConsumerFactory<String, Employee> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Employee.class));
}

需要有关如何映射或如何正确反序列化帐户字段的帮助.

Need help on how to map or how to get the account fields properly deserialize.

推荐答案

使用泛型和自定义 JavaType 方法.

Use Generics and a custom JavaType method.

Class Employee<T> {
private String name;
private String address;
private T account;
//getters
//setters
}

JavaType withCurrent = TypeFactory.defaultInstance().constructParametricType(Employee.class, CurrentAcc.class);

JavaType withSaving = TypeFactory.defaultInstance().constructParametricType(Employee.class, SavingAcc.class);

public static JavaType determineType(String topic, byte[] data, Headers headers) {
    // If it's a current account
        return withCurrent;
    // else 
        return withSaving;
}

如果您自己构建解串器,请使用

If you construct the deserializer yourself use

deser.setTypeResolver(MyClass::determineType);

使用属性进行配置时.

spring.json.value.type.method=com.mycompany.MyCass.determineType

您必须检查数据或标题(或主题)以确定所需的类型.

You have to inspect the data or headers (or topic) to determine which type you want.

编辑

这是一个完整的例子.在这种情况下,我在 Account 对象中传递了一个类型提示,但另一种方法是在生产者端设置一个标头.

Here is a complete example. In this case, I pass a type hint in the Account object, but an alternative would be to set a header on the producer side.

@SpringBootApplication
public class JacksonApplication {

    public static void main(String[] args) {
        SpringApplication.run(JacksonApplication.class, args);
    }

    @Data
    public static class Employee<T extends Account> {
        private String name;
        private T account;
    }

    @Data
    public static abstract class Account {
        private final String type;
        protected Account(String type) {
            this.type = type;
        }
    }

    @Data
    public static class CurrentAccount extends Account {
        private BigDecimal balance;
        private BigDecimal limit;
        public CurrentAccount() {
            super("C");
        }
    }

    @Data
    public static class SavingAccount extends Account {
        private BigDecimal balance;
        public SavingAccount() {
            super("S");
        }
    }

    @KafkaListener(id = "empListener", topics = "employees")
    public void listen(Employee<Account> e) {
        System.out.println(e);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("employees").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
        return args -> {
            Employee<CurrentAccount> emp1 = new Employee<>();
            emp1.setName("someOneWithACurrentAccount");
            CurrentAccount currentAccount = new CurrentAccount();
            currentAccount.setBalance(BigDecimal.ONE);
            currentAccount.setLimit(BigDecimal.TEN);
            emp1.setAccount(currentAccount);
            template.send("employees", emp1);
            Employee<SavingAccount> emp2 = new Employee<>();
            emp2.setName("someOneWithASavingAccount");
            SavingAccount savingAccount = new SavingAccount();
            savingAccount.setBalance(BigDecimal.ONE);
            emp2.setAccount(savingAccount);
            template.send("employees", emp2);
        };
    }

    private static final JavaType withCurrent = TypeFactory.defaultInstance()
            .constructParametricType(Employee.class, CurrentAccount.class);

    private static final JavaType withSaving = TypeFactory.defaultInstance()
            .constructParametricType(Employee.class, SavingAccount.class);

    public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
        if (JsonPath.read(new ByteArrayInputStream(data), "$.account.type").equals("C")) {
            return withCurrent;
        }
        else {
            return withSaving;
        }
    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.JacksonApplication.determineType

结果

JacksonApplication.Employee(name=someOneWithACurrentAccount, account=JacksonApplication.CurrentAccount(balance=1, limit=10))
JacksonApplication.Employee(name=someOneWithASavingAccount, account=JacksonApplication.SavingAccount(balance=1))

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>jackson</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.jayway.jsonpath</groupId>
            <artifactId>json-path</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

EDIT2

这是一个在标题中传达类型提示的示例......

And here is an example that conveys the type hint in a header instead...

@SpringBootApplication
public class JacksonApplication {

    public static void main(String[] args) {
        SpringApplication.run(JacksonApplication.class, args);
    }

    @Data
    public static class Employee<T extends Account> {
        private String name;
        private T account;
    }

    @Data
    public static abstract class Account {
    }

    @Data
    public static class CurrentAccount extends Account {
        private BigDecimal balance;
        private BigDecimal limit;
    }

    @Data
    public static class SavingAccount extends Account {
        private BigDecimal balance;
    }

    @KafkaListener(id = "empListener", topics = "employees")
    public void listen(Employee<Account> e) {
        System.out.println(e);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("employees").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
        return args -> {
            Employee<CurrentAccount> emp1 = new Employee<>();
            emp1.setName("someOneWithACurrentAccount");
            CurrentAccount currentAccount = new CurrentAccount();
            currentAccount.setBalance(BigDecimal.ONE);
            currentAccount.setLimit(BigDecimal.TEN);
            emp1.setAccount(currentAccount);
            template.send("employees", emp1);
            Employee<SavingAccount> emp2 = new Employee<>();
            emp2.setName("someOneWithASavingAccount");
            SavingAccount savingAccount = new SavingAccount();
            savingAccount.setBalance(BigDecimal.ONE);
            emp2.setAccount(savingAccount);
            template.send("employees", emp2);
        };
    }

    private static final JavaType withCurrent = TypeFactory.defaultInstance()
            .constructParametricType(Employee.class, CurrentAccount.class);

    private static final JavaType withSaving = TypeFactory.defaultInstance()
            .constructParametricType(Employee.class, SavingAccount.class);

    public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
        if (headers.lastHeader("accountType").value()[0] == 'C') {
            return withCurrent;
        }
        else {
            return withSaving;
        }
    }

    public static class MySerializer extends JsonSerializer<Employee<?>> {

        @Override
        public byte[] serialize(String topic, Headers headers, Employee<?> emp) {
            headers.add(new RecordHeader("accountType",
                    new byte[] { (byte) (emp.getAccount() instanceof CurrentAccount ? 'C' : 'S')}));
            return super.serialize(topic, headers, emp);
        }

    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo2.JacksonApplication.MySerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo2.JacksonApplication.determineType

这篇关于Spring Kafka Consumer 将消息作为 LinkedHashMap 消费,因此自动将 BigDecimal 转换为 double的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆