如何实现通用 Kafka Streams 反序列化器 [英] How to implement Generic Kafka Streams Deserializer
问题描述
我喜欢 Kafka,但讨厌必须编写大量序列化器/反序列化器,所以我尝试创建一个可以反序列化泛型 T 的 GenericDeserializer
.
I like Kafka, but hate having to write lots of serializers/deserializers, so I tried to create a GenericDeserializer<T>
that could deserialize a generic type T.
这是我的尝试:
class GenericDeserializer< T > implements Deserializer< T > {
static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public T deserialize( String topic, byte[] data) {
T result = null;
try {
result = ( T )( objectMapper.readValue( data, T.class ) );
}
catch ( Exception e ) {
e.printStackTrace();
}
return result;
}
@Override
public void close() {
}
}
但是,(Eclipse)Java 编译器抱怨 line
However, the (Eclipse) Java compiler complains about line
result = ( T )( objectMapper.readValue( data, T.class ) );
带有消息 Illegal class literal for the type parameter T
.
问题:
- 你能解释一下这条消息的含义吗?
- 有没有办法解决这个问题以获得预期的效果?
推荐答案
您可以使用 com.fasterxml.jackson.core.type
包中的 TypeReference 实现通用反序列化
you can achieve generic deserialization by using TypeReference from package com.fasterxml.jackson.core.type
public class KafkaGenericDeserializer<T> implements Deserializer<T> {
private final ObjectMapper mapper;
private final TypeReference<T> typeReference;
public KafkaGenericDeserializer(ObjectMapper mapper, TypeReference<T> typeReference) {
this.mapper = mapper;
this.typeReference = typeReference;
}
@Override
public T deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
try {
return mapper.readValue(data, typeReference);
} catch (final IOException ex) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", ex);
}
}
@Override
public void close() {}
@Override
public void configure(final Map<String, ?> settings, final boolean isKey) {}
}
使用这样的通用解串器,你可以创建Serge
:
using such generic deserializer, you can create Serge
:
public static <T> Serde<T> createSerdeWithGenericDeserializer(TypeReference<T> typeReference) {
KafkaGenericDeserializer<T> kafkaGenericDeserializer = new KafkaGenericDeserializer<>(objectMapper, typeReference);
return Serdes.serdeFrom(new JsonSerializer<>(), kafkaGenericDeserializer);
}
这里JsonSerializer
来自spring-kafka
依赖,或者实现自己的序列化.
here JsonSerializer
is from spring-kafka
dependency, or implement your own serialization.
之后,您可以在 Kafka 流创建期间使用 serde:
after that you can use serde during Kafka stream creation:
TypeReference<YourGenericClass<SpecificClass>> typeReference = new TypeReference<YourGenericClass<SpecificClass>>() {};
Serde<YourGenericClass<SpecificClass>> itemSerde = createSerdeWithGenericDeserializer(typeReference);
Consumed<String, YourGenericClass<SpecificClass>> consumed = Consumed.with(Serdes.String(), itemSerde);
streamsBuilder.stream(topicName, consumed);
这篇关于如何实现通用 Kafka Streams 反序列化器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!