如何从Apache Beam KafkaIO中的kafka主题推断avro模式 [英] How to infer avro schema from a kafka topic in Apache Beam KafkaIO

本文介绍了如何从Apache Beam KafkaIO中的kafka主题推断avro模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Apache Beam的kafkaIO来读取在Confluent模式注册表中具有avro模式的主题. 我可以反序列化消息并写入文件.但最终我想写BigQuery.我的管道无法推断模式. 如何提取/推断模式并将其附加到管道中的数据,以便我的下游进程(写入BigQuery)可以推断模式?

I'm using Apache Beam's kafkaIO to read from a topic that has an avro schema in Confluent schema registry. I'm able to deserialize the message and write to files. But ultimately i want to write to BigQuery. My pipeline isn't able to infer the schema. How do I extract/infer the schema and attach it to the data in the pipeline so that my downstream processes (write to BigQuery) can infer the schema?

以下是我使用架构注册表URL设置反序列化器以及从卡夫卡读取的代码:

Here is the code where I use the schema registry url to set the deserializer and where i read from Kafka:

    consumerConfig.put(
                        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
                        options.getSchemaRegistryUrl());

String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();

ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
            ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);

pipeline
        .apply("Read from Kafka",
                KafkaIO
                        .<byte[], GenericRecord>read()
                        .withBootstrapServers(options.getKafkaBrokers().get())
                        .withTopics(Utils.getListFromString(options.getKafkaTopics()))
                        .withConsumerConfigUpdates(consumerConfig)
                        .withValueDeserializer(valDeserializerProvider)
                        .withKeyDeserializer(ByteArrayDeserializer.class)

                        .commitOffsetsInFinalize()
                        .withoutMetadata()

        );

最初,我认为这足以使Beam推断出架构,但因为hasSchema()返回false,这还不够.

I initially thought that this would be enough for beam to infer the schema, but it does not since hasSchema() returns false.

任何帮助将不胜感激.

推荐答案

正在进行的工作支持推断存储在KafkaIO中Confluent Schema Registry中的Avro模式.不过,现在也可以在用户管道代码中进行操作.

There is ongoing work to support inferring of Avro schema, stored in Confluent Schema Registry, in KafkaIO. Though, it's possible to do now in user pipeline code as well.

这篇关于如何从Apache Beam KafkaIO中的kafka主题推断avro模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆