Spring Kafka Consumer使用消息为LinkedHashMap,因此自动将BigDecimal转换为double [英] Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double

查看:107
本文介绍了Spring Kafka Consumer使用消息为LinkedHashMap,因此自动将BigDecimal转换为double的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用基于注释的spring kafka侦听器来使用kafka消息,并且代码如下

I am using annotation based spring kafka listener to consume the kafka messages, and code is as below

  1. 消费员工对象

Class Employee{
private String name;
private String address;
private Object account;
//getters
//setters
}

  1. 帐户对象在运行时决定是保存帐户还是活期帐户等.

Class SavingAcc{
private BigDecimal balance;

}
Class CurrentAcc{
private BigDecimal balance;
private BigDecimal limit;
}

  1. 保存&具有BigDecimal字段来存储余额的经常账户.
  2. 因此,从Kafka生产者发送Employee对象时,所有字段均已正确映射并以BigDecimal的正确格式显示.
  3. 但是当在另一个服务中使用Employee对象时,帐户对象会随着LinkedHashMap和BigDecimal字段转换为Double出现.造成问题的原因.
  4. 据我了解,主要原因可能是 a)将帐户声明为对象类型,而不是特定类型 b)或者应该更特别地提供解串器. [我已经将Employee.class作为类型提供给kafka接收器反序列化器,因此Employee字段已正确映射,但帐户字段错误].
  1. Saving & Current Account having BigDecimal Fields to store balance.
  2. Hence while sending Employee object from Kafka producer, all the fields are correctly mapped and appears in correct format of BigDecimal, etc.
  3. But while consuming the Employee object in another service, account object is appearing as LinkedHashMap and BigDecimal fields are converted to Double. which is causing issues.
  4. As per my understanding, the main reason can be as a) Declaration of account as Object type instead of specific type b) Or the deserializer should be provided more specifically. [I have already give Employee.class as type to kafka receiver deserializer, so Employee fields are correctly mapped but account fields wrong].

@Bean
public ConsumerFactory<String, Employee> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Employee.class));
}

需要有关如何映射或如何正确反序列化帐户字段的帮助.

Need help on how to map or how to get the account fields properly deserialize.

推荐答案

使用泛型和自定义

Use Generics and a custom JavaType method.

Class Employee<T> {
private String name;
private String address;
private T account;
//getters
//setters
}

JavaType withCurrent = TypeFactory.defaultInstance().constructParametricType(Employee.class, CurrentAcc.class);

JavaType withSaving = TypeFactory.defaultInstance().constructParametricType(Employee.class, SavingAcc.class);

public static JavaType determineType(String topic, byte[] data, Headers headers) {
    // If it's a current account
        return withCurrent;
    // else 
        return withSaving;
}

如果您自己构造反序列化器,则使用

If you construct the deserializer yourself use

deser.setTypeResolver(MyClass::determineType);

使用属性进行配置时.

spring.json.value.type.method=com.mycompany.MyCass.determineType

您必须检查数据或标题(或主题)以确定所需的类型.

You have to inspect the data or headers (or topic) to determine which type you want.

编辑

这是一个完整的示例.在这种情况下,我在Account对象中传递了类型提示,但另一种方法是在生产者端设置标头.

Here is a complete example. In this case, I pass a type hint in the Account object, but an alternative would be to set a header on the producer side.

@SpringBootApplication
public class JacksonApplication {

    public static void main(String[] args) {
        SpringApplication.run(JacksonApplication.class, args);
    }

    @Data
    public static class Employee<T extends Account> {
        private String name;
        private T account;
    }

    @Data
    public static abstract class Account {
        private final String type;
        protected Account(String type) {
            this.type = type;
        }
    }

    @Data
    public static class CurrentAccount extends Account {
        private BigDecimal balance;
        private BigDecimal limit;
        public CurrentAccount() {
            super("C");
        }
    }

    @Data
    public static class SavingAccount extends Account {
        private BigDecimal balance;
        public SavingAccount() {
            super("S");
        }
    }

    @KafkaListener(id = "empListener", topics = "employees")
    public void listen(Employee<Account> e) {
        System.out.println(e);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("employees").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
        return args -> {
            Employee<CurrentAccount> emp1 = new Employee<>();
            emp1.setName("someOneWithACurrentAccount");
            CurrentAccount currentAccount = new CurrentAccount();
            currentAccount.setBalance(BigDecimal.ONE);
            currentAccount.setLimit(BigDecimal.TEN);
            emp1.setAccount(currentAccount);
            template.send("employees", emp1);
            Employee<SavingAccount> emp2 = new Employee<>();
            emp2.setName("someOneWithASavingAccount");
            SavingAccount savingAccount = new SavingAccount();
            savingAccount.setBalance(BigDecimal.ONE);
            emp2.setAccount(savingAccount);
            template.send("employees", emp2);
        };
    }

    private static final JavaType withCurrent = TypeFactory.defaultInstance()
            .constructParametricType(Employee.class, CurrentAccount.class);

    private static final JavaType withSaving = TypeFactory.defaultInstance()
            .constructParametricType(Employee.class, SavingAccount.class);

    public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
        if (JsonPath.read(new ByteArrayInputStream(data), "$.account.type").equals("C")) {
            return withCurrent;
        }
        else {
            return withSaving;
        }
    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.JacksonApplication.determineType

结果

JacksonApplication.Employee(name=someOneWithACurrentAccount, account=JacksonApplication.CurrentAccount(balance=1, limit=10))
JacksonApplication.Employee(name=someOneWithASavingAccount, account=JacksonApplication.SavingAccount(balance=1))

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>jackson</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
        <dependency>
            <groupId>com.jayway.jsonpath</groupId>
            <artifactId>json-path</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

EDIT2

这是一个在标头中传达类型提示的示例...

And here is an example that conveys the type hint in a header instead...

@SpringBootApplication
public class JacksonApplication {

    public static void main(String[] args) {
        SpringApplication.run(JacksonApplication.class, args);
    }

    @Data
    public static class Employee<T extends Account> {
        private String name;
        private T account;
    }

    @Data
    public static abstract class Account {
    }

    @Data
    public static class CurrentAccount extends Account {
        private BigDecimal balance;
        private BigDecimal limit;
    }

    @Data
    public static class SavingAccount extends Account {
        private BigDecimal balance;
    }

    @KafkaListener(id = "empListener", topics = "employees")
    public void listen(Employee<Account> e) {
        System.out.println(e);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("employees").partitions(1).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
        return args -> {
            Employee<CurrentAccount> emp1 = new Employee<>();
            emp1.setName("someOneWithACurrentAccount");
            CurrentAccount currentAccount = new CurrentAccount();
            currentAccount.setBalance(BigDecimal.ONE);
            currentAccount.setLimit(BigDecimal.TEN);
            emp1.setAccount(currentAccount);
            template.send("employees", emp1);
            Employee<SavingAccount> emp2 = new Employee<>();
            emp2.setName("someOneWithASavingAccount");
            SavingAccount savingAccount = new SavingAccount();
            savingAccount.setBalance(BigDecimal.ONE);
            emp2.setAccount(savingAccount);
            template.send("employees", emp2);
        };
    }

    private static final JavaType withCurrent = TypeFactory.defaultInstance()
            .constructParametricType(Employee.class, CurrentAccount.class);

    private static final JavaType withSaving = TypeFactory.defaultInstance()
            .constructParametricType(Employee.class, SavingAccount.class);

    public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
        if (headers.lastHeader("accountType").value()[0] == 'C') {
            return withCurrent;
        }
        else {
            return withSaving;
        }
    }

    public static class MySerializer extends JsonSerializer<Employee<?>> {

        @Override
        public byte[] serialize(String topic, Headers headers, Employee<?> emp) {
            headers.add(new RecordHeader("accountType",
                    new byte[] { (byte) (emp.getAccount() instanceof CurrentAccount ? 'C' : 'S')}));
            return super.serialize(topic, headers, emp);
        }

    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo2.JacksonApplication.MySerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo2.JacksonApplication.determineType

这篇关于Spring Kafka Consumer使用消息为LinkedHashMap,因此自动将BigDecimal转换为double的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆