Spring Kafka Consumer 将消息作为 LinkedHashMap 消费,因此自动将 BigDecimal 转换为 double [英] Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double
问题描述
我正在使用基于注解的 spring kafka 监听器来消费 kafka 消息,代码如下
I am using annotation based spring kafka listener to consume the kafka messages, and code is as below
- 使用员工对象
Class Employee{
private String name;
private String address;
private Object account;
//getters
//setters
}
- Account 对象在运行时决定它是 Saving Account 还是 Current Account 等.
Class SavingAcc{
private BigDecimal balance;
}
Class CurrentAcc{
private BigDecimal balance;
private BigDecimal limit;
}
- 储蓄&当前帐户具有 BigDecimal 字段来存储余额.
- 因此,当从 Kafka 生产者发送 Employee 对象时,所有字段都被正确映射并以正确的 BigDecimal 等格式出现.
- 但是在另一个服务中使用 Employee 对象时,帐户对象显示为 LinkedHashMap,而 BigDecimal 字段被转换为 Double.这会导致问题.
- 据我了解,主要原因可能是a) 将帐户声明为对象类型而不是特定类型b) 或者应该提供更具体的解串器.[我已经将 Employee.class 作为 kafka 接收器反序列化器的类型,因此 Employee 字段被正确映射但帐户字段错误].
@Bean
public ConsumerFactory<String, Employee> consumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Employee.class));
}
需要有关如何映射或如何正确反序列化帐户字段的帮助.
Need help on how to map or how to get the account fields properly deserialize.
推荐答案
使用泛型和自定义 JavaType
方法.
Use Generics and a custom JavaType
method.
Class Employee<T> {
private String name;
private String address;
private T account;
//getters
//setters
}
JavaType withCurrent = TypeFactory.defaultInstance().constructParametricType(Employee.class, CurrentAcc.class);
JavaType withSaving = TypeFactory.defaultInstance().constructParametricType(Employee.class, SavingAcc.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) {
// If it's a current account
return withCurrent;
// else
return withSaving;
}
如果您自己构建解串器,请使用
If you construct the deserializer yourself use
deser.setTypeResolver(MyClass::determineType);
使用属性进行配置时.
spring.json.value.type.method=com.mycompany.MyCass.determineType
您必须检查数据或标题(或主题)以确定所需的类型.
You have to inspect the data or headers (or topic) to determine which type you want.
编辑
这是一个完整的例子.在这种情况下,我在 Account
对象中传递了一个类型提示,但另一种方法是在生产者端设置一个标头.
Here is a complete example. In this case, I pass a type hint in the Account
object, but an alternative would be to set a header on the producer side.
@SpringBootApplication
public class JacksonApplication {
public static void main(String[] args) {
SpringApplication.run(JacksonApplication.class, args);
}
@Data
public static class Employee<T extends Account> {
private String name;
private T account;
}
@Data
public static abstract class Account {
private final String type;
protected Account(String type) {
this.type = type;
}
}
@Data
public static class CurrentAccount extends Account {
private BigDecimal balance;
private BigDecimal limit;
public CurrentAccount() {
super("C");
}
}
@Data
public static class SavingAccount extends Account {
private BigDecimal balance;
public SavingAccount() {
super("S");
}
}
@KafkaListener(id = "empListener", topics = "employees")
public void listen(Employee<Account> e) {
System.out.println(e);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("employees").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
return args -> {
Employee<CurrentAccount> emp1 = new Employee<>();
emp1.setName("someOneWithACurrentAccount");
CurrentAccount currentAccount = new CurrentAccount();
currentAccount.setBalance(BigDecimal.ONE);
currentAccount.setLimit(BigDecimal.TEN);
emp1.setAccount(currentAccount);
template.send("employees", emp1);
Employee<SavingAccount> emp2 = new Employee<>();
emp2.setName("someOneWithASavingAccount");
SavingAccount savingAccount = new SavingAccount();
savingAccount.setBalance(BigDecimal.ONE);
emp2.setAccount(savingAccount);
template.send("employees", emp2);
};
}
private static final JavaType withCurrent = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, CurrentAccount.class);
private static final JavaType withSaving = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, SavingAccount.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
if (JsonPath.read(new ByteArrayInputStream(data), "$.account.type").equals("C")) {
return withCurrent;
}
else {
return withSaving;
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.JacksonApplication.determineType
结果
JacksonApplication.Employee(name=someOneWithACurrentAccount, account=JacksonApplication.CurrentAccount(balance=1, limit=10))
JacksonApplication.Employee(name=someOneWithASavingAccount, account=JacksonApplication.SavingAccount(balance=1))
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>jackson</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
EDIT2
这是一个在标题中传达类型提示的示例......
And here is an example that conveys the type hint in a header instead...
@SpringBootApplication
public class JacksonApplication {
public static void main(String[] args) {
SpringApplication.run(JacksonApplication.class, args);
}
@Data
public static class Employee<T extends Account> {
private String name;
private T account;
}
@Data
public static abstract class Account {
}
@Data
public static class CurrentAccount extends Account {
private BigDecimal balance;
private BigDecimal limit;
}
@Data
public static class SavingAccount extends Account {
private BigDecimal balance;
}
@KafkaListener(id = "empListener", topics = "employees")
public void listen(Employee<Account> e) {
System.out.println(e);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("employees").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Employee> template) {
return args -> {
Employee<CurrentAccount> emp1 = new Employee<>();
emp1.setName("someOneWithACurrentAccount");
CurrentAccount currentAccount = new CurrentAccount();
currentAccount.setBalance(BigDecimal.ONE);
currentAccount.setLimit(BigDecimal.TEN);
emp1.setAccount(currentAccount);
template.send("employees", emp1);
Employee<SavingAccount> emp2 = new Employee<>();
emp2.setName("someOneWithASavingAccount");
SavingAccount savingAccount = new SavingAccount();
savingAccount.setBalance(BigDecimal.ONE);
emp2.setAccount(savingAccount);
template.send("employees", emp2);
};
}
private static final JavaType withCurrent = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, CurrentAccount.class);
private static final JavaType withSaving = TypeFactory.defaultInstance()
.constructParametricType(Employee.class, SavingAccount.class);
public static JavaType determineType(String topic, byte[] data, Headers headers) throws IOException {
if (headers.lastHeader("accountType").value()[0] == 'C') {
return withCurrent;
}
else {
return withSaving;
}
}
public static class MySerializer extends JsonSerializer<Employee<?>> {
@Override
public byte[] serialize(String topic, Headers headers, Employee<?> emp) {
headers.add(new RecordHeader("accountType",
new byte[] { (byte) (emp.getAccount() instanceof CurrentAccount ? 'C' : 'S')}));
return super.serialize(topic, headers, emp);
}
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.value-serializer=com.example.demo2.JacksonApplication.MySerializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo2.JacksonApplication.determineType
这篇关于Spring Kafka Consumer 将消息作为 LinkedHashMap 消费,因此自动将 BigDecimal 转换为 double的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!