如何实现通用 Kafka Streams 反序列化器 [英] How to implement Generic Kafka Streams Deserializer

查看:57
本文介绍了如何实现通用 Kafka Streams 反序列化器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我喜欢 Kafka,但讨厌必须编写大量序列化器/反序列化器,所以我尝试创建一个可以反序列化泛型 T 的 GenericDeserializer.

I like Kafka, but hate having to write lots of serializers/deserializers, so I tried to create a GenericDeserializer<T> that could deserialize a generic type T.

这是我的尝试:

class GenericDeserializer< T > implements Deserializer< T > {
    static final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
    }
    @Override
    public T deserialize( String topic, byte[] data) {
            T result = null;
            try {
                    result = ( T )( objectMapper.readValue( data, T.class ) );
            }
            catch ( Exception e ) {
                    e.printStackTrace();
            }
            return result;
    }
    @Override
    public void close() {
    }
}

但是,(Eclipse)Java 编译器抱怨 line

However, the (Eclipse) Java compiler complains about line

result = ( T )( objectMapper.readValue( data, T.class ) );

带有消息 Illegal class literal for the type parameter T.

问题:

  1. 你能解释一下这条消息的含义吗?
  2. 有没有办法解决这个问题以获得预期的效果?

推荐答案

您可以使用 com.fasterxml.jackson.core.type 包中的 TypeReference 实现通用反序列化

you can achieve generic deserialization by using TypeReference from package com.fasterxml.jackson.core.type

public class KafkaGenericDeserializer<T> implements Deserializer<T> {

    private final ObjectMapper mapper;
    private final TypeReference<T> typeReference;

    public KafkaGenericDeserializer(ObjectMapper mapper, TypeReference<T> typeReference) {
        this.mapper = mapper;
        this.typeReference = typeReference;
    }

    @Override
    public T deserialize(final String topic, final byte[] data) {
        if (data == null) {
            return null;
        }

        try {
            return mapper.readValue(data, typeReference);
        } catch (final IOException ex) {
            throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) + "] from topic [" + topic + "]", ex);
        }
    }

    @Override
    public void close() {}

    @Override
    public void configure(final Map<String, ?> settings, final boolean isKey) {}
}

使用这样的通用解串器,你可以创建Serge:

using such generic deserializer, you can create Serge:

public static <T> Serde<T> createSerdeWithGenericDeserializer(TypeReference<T> typeReference) {
    KafkaGenericDeserializer<T> kafkaGenericDeserializer = new KafkaGenericDeserializer<>(objectMapper, typeReference);
    return Serdes.serdeFrom(new JsonSerializer<>(), kafkaGenericDeserializer);
}

这里JsonSerializer来自spring-kafka依赖,或者实现自己的序列化.

here JsonSerializer is from spring-kafka dependency, or implement your own serialization.

之后,您可以在 Kafka 流创建期间使用 serde:

after that you can use serde during Kafka stream creation:

TypeReference<YourGenericClass<SpecificClass>> typeReference = new TypeReference<YourGenericClass<SpecificClass>>() {};
Serde<YourGenericClass<SpecificClass>> itemSerde = createSerdeWithGenericDeserializer(typeReference);
Consumed<String, YourGenericClass<SpecificClass>> consumed = Consumed.with(Serdes.String(), itemSerde);
streamsBuilder.stream(topicName, consumed);

这篇关于如何实现通用 Kafka Streams 反序列化器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆