将自定义 Java 对象发送到 Kafka 主题 [英] Send Custom Java Objects to Kafka Topic
问题描述
我有我的自定义 Java 对象,并希望利用内置序列化中的 JVM 将其发送到 Kafka 主题,但序列化失败并出现以下错误
I have my custom Java Object and wish to leverage JVM's in built serialization to send it to a Kafka topic, but serialization fails with below error
org.apache.kafka.common.errors.SerializationException:无法转换类 com.spring.kafka.Payload 到类的值org.apache.kafka.common.serialization.ByteArraySerializer 中指定value.serializer
org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.spring.kafka.Payload to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer
Payload.java
public class Payload implements Serializable {
private static final long serialVersionUID = 123L;
private String name="vinod";
private int anInt = 5;
private Double aDouble = new Double("5.0");
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public int getAnInt() {
return anInt;
}
public void setAnInt(int anInt) {
this.anInt = anInt;
}
public Double getaDouble() {
return aDouble;
}
public void setaDouble(Double aDouble) {
this.aDouble = aDouble;
}
}
在我创建生产者的过程中,我设置了以下属性
During my creation of producer, I have the following properties set
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.ByteArraySerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.ByteArraySerializer" />
我的发送调用如下
kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));
通过生产者将自定义 java 对象发送到 kafka 主题的正确方法是什么?
What is correct way to send a custom java object through a producer to a kafka topic ?
推荐答案
我们有以下 2 个选项
We have 2 Options as listed below
1) 如果我们打算将自定义 java 对象发送给生产者,我们需要创建一个实现 org.apache.kafka.common.serialization.Serializer 的序列化程序,并在创建过程中传递该序列化程序类你的制作人
1) If we intend to send custom java objects to producer, We need to create a serializer which implements org.apache.kafka.common.serialization.Serializer and pass that Serializer class during creation of your producer
下面的代码参考
public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {
public void configure(Map map, boolean b) {
}
public byte[] serialize(String s, Object o) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}
}
public void close() {
}
}
并相应地设置值序列化器
And set the value serializer accordingly
<entry key="value.serializer"
value="com.spring.kafka.PayloadSerializer" />
2) 无需创建自定义序列化程序类.使用现有的 ByteArraySerializer,但在发送过程中遵循流程
2) No need to create custom serializer class. Use the existing ByteArraySerializer, but during send follow the process
Java 对象 -> 字符串(最好是 JSON 表示而不是toString)->byteArray
Java Object -> String (Preferrably JSON represenation instead of toString)->byteArray
这篇关于将自定义 Java 对象发送到 Kafka 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!