KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord [英] KafkaAvroDeserializer does not return SpecificRecord but returns GenericRecord

查看:561
本文介绍了KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 KafkaProducer 能够使用 KafkaAvroSerializer 将对象序列化到我的主题。但是, KafkaConsumer.poll()返回反序列化的 GenericRecord 而不是我的序列化类。

My KafkaProducer is able to use KafkaAvroSerializer to serialize objects to my topic. However, KafkaConsumer.poll() returns deserialized GenericRecord instead of my serialized class.

MyKafkaProducer

MyKafkaProducer

 KafkaProducer<CharSequence, MyBean> producer;
    try (InputStream props = Resources.getResource("producer.props").openStream()) {
      Properties properties = new Properties();
      properties.load(props);
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          io.confluent.kafka.serializers.KafkaAvroSerializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");

      MyBean bean = new MyBean();
      producer = new KafkaProducer<>(properties);
      producer.send(new ProducerRecord<>(topic, bean.getId(), bean));

我的KafkaConsumer

My KafkaConsumer

 try (InputStream props = Resources.getResource("consumer.props").openStream()) {
      properties.load(props);
      properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
      properties.put("schema.registry.url", "http://localhost:8081");
      consumer = new KafkaConsumer<>(properties);
    }
    consumer.subscribe(Arrays.asList(topic));
    try {
      while (true) {
        ConsumerRecords<CharSequence, MyBean> records = consumer.poll(100);
        if (records.isEmpty()) {
          continue;
        }
        for (ConsumerRecord<CharSequence, MyBean> record : records) {
          MyBean bean = record.value(); // <-------- This is throwing a cast Exception because it cannot cast GenericRecord to MyBean
          System.out.println("consumer received: " + bean);
        }
      }

MyBean bean = record。 value(); 该行抛出一个强制转换异常,因为它无法将GenericRecord强制转换为MyBean。

MyBean bean = record.value(); That line throws a cast Exception because it cannot cast GenericRecord to MyBean.

我正在使用 kafka-client-0.9.0.1 kafka-avro-serializer-3.0.0

推荐答案

KafkaAvroDeserializer支持SpecificData



默认情况下不启用它。启用它:

KafkaAvroDeserializer supports SpecificData

It's not enabled by default. To enable it:

properties.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);



KafkaAvroDeserializer不支持ReflectData



Confluent's KafkaAvroDeserializer 不知道如何使用Avro ReflectData反序列化。我不得不扩展它以支持Avro ReflectData:

KafkaAvroDeserializer does not support ReflectData

Confluent's KafkaAvroDeserializer does not know how to deserialize using Avro ReflectData. I had to extend it to support Avro ReflectData:

/**
 * Extends deserializer to support ReflectData.
 *
 * @param <V>
 *     value type
 */
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {

  private Schema readerSchema;
  private DecoderFactory decoderFactory = DecoderFactory.get();

  protected ReflectKafkaAvroDeserializer(Class<V> type) {
    readerSchema = ReflectData.get().getSchema(type);
  }

  @Override
  protected Object deserialize(
      boolean includeSchemaAndVersion,
      String topic,
      Boolean isKey,
      byte[] payload,
      Schema readerSchemaIgnored) throws SerializationException {

    if (payload == null) {
      return null;
    }

    int schemaId = -1;
    try {
      ByteBuffer buffer = ByteBuffer.wrap(payload);
      if (buffer.get() != MAGIC_BYTE) {
        throw new SerializationException("Unknown magic byte!");
      }

      schemaId = buffer.getInt();
      Schema writerSchema = schemaRegistry.getByID(schemaId);

      int start = buffer.position() + buffer.arrayOffset();
      int length = buffer.limit() - 1 - idSize;
      DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
      BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
      return reader.read(null, decoder);
    } catch (IOException e) {
      throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
    } catch (RestClientException e) {
      throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
    }
  }
}

定义一个自定义反序列化器类反序列化为 MyBean

Define a custom deserializer class which deserializes to MyBean:

public class MyBeanDeserializer extends ReflectKafkaAvroDeserializer<MyBean> {
  public MyBeanDeserializer() {
    super(MyBean.class);
  }
}

配置 KafkaConsumer 使用自定义反序列化器类:

Configure KafkaConsumer to use the custom deserializer class:

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyBeanDeserializer.class);

这篇关于KafkaAvroDeserializer不返回SpecificRecord但返回GenericRecord的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆