如何实现通用Kafka流反序列化器 [英] How to implement Generic Kafka Streams Deserializer
问题描述
我喜欢Kafka,但是讨厌不得不编写许多序列化器/反序列化器,因此我尝试创建一个可以对通用类型T进行反序列化的GenericDeserializer<T>
.
I like Kafka, but hate having to write lots of serializers/deserializers, so I tried to create a GenericDeserializer<T>
that could deserialize a generic type T.
这是我的尝试:
class GenericDeserializer< T > implements Deserializer< T > {
static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
public T deserialize( String topic, byte[] data) {
T result = null;
try {
result = ( T )( objectMapper.readValue( data, T.class ) );
}
catch ( Exception e ) {
e.printStackTrace();
}
return result;
}
@Override
public void close() {
}
}
但是,(Eclipse)Java编译器抱怨行
However, the (Eclipse) Java compiler complains about line
result = ( T )( objectMapper.readValue( data, T.class ) );
,并显示消息Illegal class literal for the type parameter T
.
问题:
- 能否请您解释一下消息的含义?
- 是否有任何方法可以解决此问题以获得预期的效果?
推荐答案
您可以通过使用软件包com.fasterxml.jackson.core.type
you can achieve generic deserialization by using TypeReference from package com.fasterxml.jackson.core.type
public class KafkaGenericDeserializer<T> implements Deserializer<T> {
private final ObjectMapper mapper;
private final TypeReference<T> typeReference;
public KafkaGenericDeserializer(ObjectMapper mapper, TypeReference<T> typeReference) {
this.mapper = mapper;
this.typeReference = typeReference;
}
@Override
public T deserialize(final String topic, final byte[] data) {
if (data == null) {
return null;
}
try {
return mapper.readValue(data, typeReference);
} catch (final IOException ex) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", ex);
}
}
@Override
public void close() {}
@Override
public void configure(final Map<String, ?> settings, final boolean isKey) {}
}
使用此类通用反序列化器,您可以创建Serge
:
using such generic deserializer, you can create Serge
:
public static <T> Serde<T> createSerdeWithGenericDeserializer(TypeReference<T> typeReference) {
KafkaGenericDeserializer<T> kafkaGenericDeserializer = new KafkaGenericDeserializer<>(objectMapper, typeReference);
return Serdes.serdeFrom(new JsonSerializer<>(), kafkaGenericDeserializer);
}
此处JsonSerializer
来自spring-kafka
依赖项,或实现您自己的序列化.
here JsonSerializer
is from spring-kafka
dependency, or implement your own serialization.
之后,您可以在创建Kafka流时使用serde:
after that you can use serde during Kafka stream creation:
TypeReference<YourGenericClass<SpecificClass>> typeReference = new TypeReference<YourGenericClass<SpecificClass>>() {};
Serde<YourGenericClass<SpecificClass>> itemSerde = createSerdeWithGenericDeserializer(typeReference);
Consumed<String, YourGenericClass<SpecificClass>> consumed = Consumed.with(Serdes.String(), itemSerde);
streamsBuilder.stream(topicName, consumed);
这篇关于如何实现通用Kafka流反序列化器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!