是否可以反序列化 Avro 消息(使用来自 Kafka 的消息)而不在 ConfluentRegistryAvroDeserializationSchema 中提供 Reader 模式 [英] Is it possible to deserialize Avro message(consuming message from Kafka) without giving Reader schema in ConfluentRegistryAvroDeserializationSchema

查看:27
本文介绍了是否可以反序列化 Avro 消息(使用来自 Kafka 的消息)而不在 ConfluentRegistryAvroDeserializationSchema 中提供 Reader 模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在 Apache Flink 中使用 Kafka 连接器来访问由 Confluent Kafka 提供的流.

I am using Kafka Connector in Apache Flink for access to streams served by Confluent Kafka.

除了模式注册表 url ConfluentRegistryAvroDeserializationSchema.forGeneric(...) 期待阅读器"模式.我不想提供读取模式,而是想使用相同的作者模式(在注册表中查找)来读取消息,因为消费者不会有最新的模式.

Apart from schema registry url ConfluentRegistryAvroDeserializationSchema.forGeneric(...) expecting 'reader' schema. Instead of providing read schema I want to use same writer's schema(lookup in registry) for reading the message too because Consumer will not have latest schema.

FlinkKafkaConsumer010<GenericRecord> myConsumer =
        new FlinkKafkaConsumer010<>("topic-name", ConfluentRegistryAvroDeserializationSchema.forGeneric(<reader schema goes here>, "http://host:port"), properties);
myConsumer.setStartFromLatest();

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html使用这些反序列化架构记录将使用从架构注册表中检索到的架构读取并转换为静态提供的架构"

https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html "Using these deserialization schema record will be read with the schema that was retrieved from Schema Registry and transformed to a statically provided"

既然我不想在消费者端保留架构定义,我该如何使用作者的架构反序列化来自 Kafka 的 Avro 消息?

Since I do not want to keep schema definition at consumer side how do I deserialize Avro message from Kafka using writer's schema?

感谢您的帮助!

推荐答案

我不认为可以直接使用 ConfluentRegistryAvroDeserializationSchema.forGeneric.它旨在与阅读器架构一起使用,并且他们对此进行了先决条件检查.

I don't think it is possible to use directly ConfluentRegistryAvroDeserializationSchema.forGeneric. It is intended to be used with a reader schema and they have preconditions checking for this.

您必须实现自己的.两个重要的东西:

You have to implement your own. Two import things:

  • specific.avro.reader 设置为 false(否则您将获得特定记录)
  • KafkaAvroDeserializer 必须被延迟初始化(因为它本身不能序列化,因为它持有对模式注册客户端的引用)
  • Set specific.avro.reader to false (other wise you'll get specific records)
  • The KafkaAvroDeserializer has to be lazily initialized (because it isn't serializable it self, as it holds a reference to the schema registry client)
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;

public class KafkaGenericAvroDeserializationSchema
    implements KeyedDeserializationSchema<GenericRecord> {

  private final String registryUrl;
  private transient KafkaAvroDeserializer inner;

  public KafkaGenericAvroDeserializationSchema(String registryUrl) {
    this.registryUrl = registryUrl;
  }

  @Override
  public GenericRecord deserialize(
      byte[] messageKey, byte[] message, String topic, int partition, long offset) {
    checkInitialized();
    return (GenericRecord) inner.deserialize(topic, message);
  }

  @Override
  public boolean isEndOfStream(GenericRecord nextElement) {
    return false;
  }

  @Override
  public TypeInformation<GenericRecord> getProducedType() {
    return TypeExtractor.getForClass(GenericRecord.class);
  }

  private void checkInitialized() {
    if (inner == null) {
      Map<String, Object> props = new HashMap<>();
      props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl);
      props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
      SchemaRegistryClient client =
          new CachedSchemaRegistryClient(
              registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
      inner = new KafkaAvroDeserializer(client, props);
    }
  }
}

env.addSource(
  new FlinkKafkaConsumer<>(
    topic, 
    new KafkaGenericAvroDeserializationSchema(schemaReigstryUrl), 
    kafkaProperties));

这篇关于是否可以反序列化 Avro 消息(使用来自 Kafka 的消息)而不在 ConfluentRegistryAvroDeserializationSchema 中提供 Reader 模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆