Kafka Streams:POJO序列化/反序列化 [英] Kafka Streams: POJO serialization/deserialization
问题描述
我们可以使用Kafka Streams中的哪些类/方法将Java对象序列化/反序列化为字节数组,反之亦然?以下链接提出了ByteArrayOutputStream&的用法. ObjectOutputStream,但它们不是线程安全的.
What class/method in Kafka Streams can we use to serialize/deserialize Java object to byte array OR vice versa? The following link proposes the usage of ByteArrayOutputStream & ObjectOutputStream but they are not thread safe.
还有一个使用ObjectMapper的选项ObjectReader(用于线程安全),但这是从POJO-> JSON-> bytearray转换而来的.似乎此选项是一个广泛的选项.想要检查是否存在将对象转换为字节数组的直接方法,反之亦然,这是线程安全的.请建议
There is another option to use the ObjectMapper, ObjectReader (for threadsafe), but that's converting from POJO -> JSON -> bytearray. Seems this option is an extensive one. Wanted to check if there is a direct way to translate object into bytearray and vice versa which is threadsafe. Please suggest
import org.apache.kafka.common.serialization.Serializer;
public class HouseSerializer<T> implements Serializer<T>{
private Class<T> tClass;
public HouseSerializer(){
}
@SuppressWarnings("unchecked")
@Override
public void configure(Map configs, boolean isKey) {
tClass = (Class<T>) configs.get("POJOClass");
}
@Override
public void close() {
}
@Override
public byte[] serialize(String topic, T data) {
//Object serialization to be performed here
return null;
}
}
注意:Kafka版本-0.10.1
Note: Kafka version - 0.10.1
推荐答案
想检查是否存在将对象转换为字节数组的直接方法
Wanted to check if there is a direct way to translate object into bytearray
我建议您考虑在Confluent Schema Registry中使用 Avro序列化,如果可能的话,但不是必需的. JSON是一个很好的后备,但需要在线"上占用更多空间,因此 MsgPack 将是那里的替代品.
I would suggest you look at using Avro serialization with the Confluent Schema Registry, if possible, but not required. JSON is a good fall back, but takes more space "on the wire", and so MsgPack would be the alternative there.
上面的示例使用 avro-maven-plugin 从src/main/resources/avro
模式文件生成LogLine类.
Above example is using the avro-maven-plugin to generate a LogLine class from the src/main/resources/avro
schema file.
Otherwise, it's up to you for how to serialize your object into a byte array, for example, a String is commonly packed as
[(length of string) (UTF8 encoded bytes)]
布尔值是单个0或1位
这是线程安全的
which is threadsafe
我了解您的担忧,但是您通常不会在线程之间共享反序列化的数据.您发送/阅读/处理每个独立的消息.
I understand the concern, but you aren't commonly sharing deserialized data between threads. You send/read/process a message for each independent one.
这篇关于Kafka Streams:POJO序列化/反序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!