如何在kafka中创建自定义序列化程序? [英] How to create Custom serializer in kafka?
问题描述
只有很少的序列化器可用,
There is only few serializer available like,
org.apache.kafka.common.serialization.StringSerializer
org.apache.kafka.common.serialization.StringSerializer
我们如何创建自己的自定义序列化器?
How can we create our own custom serializer ?
推荐答案
这里有一个示例,可以使用您自己的序列化器/解串器来获取Kafka消息值。对于Kafka,消息密钥是相同的。
Here you have an example to use your own serializer/deserializer for the Kafka message value. For Kafka message key is the same thing.
我们希望将MyMessage的序列化版本作为Kafka值发送,并将其再次反序列化为消费者端的MyMessage对象。
We want to send a serialized version of MyMessage as Kafka value and deserialize it again into a MyMessage object at consumer side.
在生产者端序列化MyMessage。
Serializing MyMessage in producer side.
您应该创建一个实现org.apache.kafka.common的序列化程序类。 serialization.Serializer
You should create a serializer class that implements org.apache.kafka.common.serialization.Serializer
serialize()方法完成工作,接收对象并返回序列化版本作为字节数组。
serialize() method do the work, receiving your object and returning a serialized version as bytes array.
public class MyValueSerializer implements Serializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public byte[] serialize(String topic, MyMessage message)
{
if (message == null) {
return null;
}
try {
(serialize your MyMessage object into bytes)
return bytes;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error serializing value", e);
}
}
@Override
public void close()
{
}
}
final IntegerSerializer keySerializer = new IntegerSerializer();
final MyValueSerializer myValueSerializer = new MyValueSerializer();
final KafkaProducer<Integer, MyMessage> producer = new KafkaProducer<>(props, keySerializer, myValueSerializer);
int messageNo = 1;
int kafkaKey = messageNo;
MyMessage kafkaValue = new MyMessage();
ProducerRecord producerRecord = new ProducerRecord<>(topic, kafkaKey, kafkaValue);
producer.send(producerRecord, new DemoCallBack(logTag, startTime, messageNo, strValue));
在消费者方面反序列化MyMessage。
Deserializing MyMessage in consumer side.
您应该创建一个实现org.apache.kafka.common.serialization.Deserializer的反序列化器类
You should create a deserializer class that implements org.apache.kafka.common.serialization.Deserializer
deserialize()方法完成工作,接收序列化值作为字节数组并返回您的对象。
deserialize() method do the work, receiving serialized value as bytes array and returning your object.
public class MyValueDeserializer implements Deserializer<MyMessage>
{
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey)
{
this.isKey = isKey;
}
@Override
public MyMessage deserialize(String s, byte[] value)
{
if (value == null) {
return null;
}
try {
(deserialize value into your MyMessage object)
MyMessage message = new MyMessage();
return message;
} catch (IOException | RuntimeException e) {
throw new SerializationException("Error deserializing value", e);
}
}
@Override
public void close()
{
}
}
然后像这样使用它:
final IntegerDeserializer keyDeserializer = new IntegerDeserializer();
final MyValueDeserializer myValueDeserializer = new MyValueDeserializer();
final KafkaConsumer<Integer, MyMessage> consumer = new KafkaConsumer<>(props, keyDeserializer, myValueDeserializer);
ConsumerRecords<Integer, MyMessage> records = consumer.poll(1000);
for (ConsumerRecord<Integer, MyMessage> record : records) {
int kafkaKey = record.key();
MyMessage kafkaValue = record.value();
...
}
这篇关于如何在kafka中创建自定义序列化程序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!