将自定义 Java 对象发送到 Kafka 主题 [英] Send Custom Java Objects to Kafka Topic

查看:24
本文介绍了将自定义 Java 对象发送到 Kafka 主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有我的自定义 Java 对象,并希望利用内置序列化中的 JVM 将其发送到 Kafka 主题,但序列化失败并出现以下错误

I have my custom Java Object and wish to leverage JVM's in built serialization to send it to a Kafka topic, but serialization fails with below error

org.apache.kafka.common.errors.SerializationException:无法转换类 com.spring.kafka.Payload 到类的值org.apache.kafka.common.serialization.ByteArraySerializer 中指定value.serializer

org.apache.kafka.common.errors.SerializationException: Can't convert value of class com.spring.kafka.Payload to class org.apache.kafka.common.serialization.ByteArraySerializer specified in value.serializer

Payload.java

public class Payload implements Serializable {

    private static final long serialVersionUID = 123L;

    private String name="vinod";

    private int anInt = 5;

    private Double aDouble = new Double("5.0");

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAnInt() {
        return anInt;
    }

    public void setAnInt(int anInt) {
        this.anInt = anInt;
    }

    public Double getaDouble() {
        return aDouble;
    }

    public void setaDouble(Double aDouble) {
        this.aDouble = aDouble;
    }

}

在我创建生产者的过程中,我设置了以下属性

During my creation of producer, I have the following properties set

<entry key="key.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />
                <entry key="value.serializer"
                       value="org.apache.kafka.common.serialization.ByteArraySerializer" />

我的发送调用如下

kafkaProducer.send(new ProducerRecord<String, Payload>("test", new Payload()));

通过生产者将自定义 java 对象发送到 kafka 主题的正确方法是什么?

What is correct way to send a custom java object through a producer to a kafka topic ?

推荐答案

我们有以下 2 个选项

We have 2 Options as listed below

1) 如果我们打算将自定义 java 对象发送给生产者,我们需要创建一个实现 org.apache.kafka.common.serialization.Serializer 的序列化程序,并在创建过程中传递该序列化程序类你的制作人

1) If we intend to send custom java objects to producer, We need to create a serializer which implements org.apache.kafka.common.serialization.Serializer and pass that Serializer class during creation of your producer

下面的代码参考

public class PayloadSerializer implements org.apache.kafka.common.serialization.Serializer {

    public void configure(Map map, boolean b) {

    }

    public byte[] serialize(String s, Object o) {

       try {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(baos);
            oos.writeObject(o);
            oos.close();
            byte[] b = baos.toByteArray();
            return b;
        } catch (IOException e) {
            return new byte[0];
        }
    }

    public void close() {

    }
}

并相应地设置值序列化器

And set the value serializer accordingly

<entry key="value.serializer"
                       value="com.spring.kafka.PayloadSerializer" />

2) 无需创建自定义序列化程序类.使用现有的 ByteArraySerializer,但在发送过程中遵循流程

2) No need to create custom serializer class. Use the existing ByteArraySerializer, but during send follow the process

Java 对象 -> 字符串(最好是 JSON 表示而不是toString)->byteArray

Java Object -> String (Preferrably JSON represenation instead of toString)->byteArray

这篇关于将自定义 Java 对象发送到 Kafka 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆