如何在 kafka 中创建自定义序列化程序? [英] How to create Custom serializer in kafka?

查看:39
本文介绍了如何在 kafka 中创建自定义序列化程序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

只有很少的序列化程序可用,例如

There is only few serializer available like,

org.apache.kafka.common.serialization.StringSerializer

我们如何创建自己的自定义序列化程序?

How can we create our own custom serializer ?

推荐答案

这里有一个示例,可以将您自己的序列化器/反序列化器用于 Kafka 消息值.对于 Kafka 消息密钥是一样的.

Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.

我们希望将 MyMessage 的序列化版本作为 Kafka 值发送,并再次将其反序列化为消费者端的 MyMessage 对象.

We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.

在生产者端序列化 MyMessage.

Serializing MyMessage in producer side.

您应该创建一个实现 org.apache.kafka.common.serialization.Serializer 的序列化器类

You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer

serialize() 方法完成这项工作,接收您的对象并以字节数组的形式返回序列化版本.

serialize() method do the work, receiving your object and returning a serialized version as bytes array.

public class MyValueSerializer implements Serializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public byte[] serialize(String topic, MyMessage message)
    {
        if (message == null) {
            return null;
        }

        try {

            (serialize your MyMessage object into bytes)

            return bytes;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);

int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));

在消费者端反序列化 MyMessage.

Deserializing MyMessage in consumer side.

您应该创建一个实现 org.apache.kafka.common.serialization.Deserializer 的反序列化器类

You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer

deserialize() 方法完成工作,接收序列化值作为字节数组并返回您的对象.

deserialize() method do the work, receiving serialized value as bytes array and returning your object.

public class MyValueDeserializer implements Deserializer<MyMessage>
{
    private boolean isKey;

    @Override
    public void configure(Map<String, ?> configs, boolean isKey)
    {
        this.isKey = isKey;
    }

    @Override
    public MyMessage deserialize(String s, byte[] value)
    {
        if (value == null) {
            return null;
        }

        try {

            (deserialize value into your MyMessage object)

            MyMessage message = new MyMessage();
            return message;

        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing value", e);
        }
    }

    @Override
    public void close()
    {

    }
}

然后像这样使用它:

final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);

ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {

    int kafkaKey = record.key();
    MyMessage kafkaValue = record.value();

    ...
}

这篇关于如何在 kafka 中创建自定义序列化程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆