如何从 Apache Beam KafkaIO 中的 kafka 主题推断 avro 模式 [英] How to infer avro schema from a kafka topic in Apache Beam KafkaIO

本文介绍了如何从 Apache Beam KafkaIO 中的 kafka 主题推断 avro 模式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Apache Beam 的 kafkaIO 从 Confluent 模式注册表中具有 avro 模式的主题中读取数据.我能够反序列化消息并写入文件.但最终我想写信给 BigQuery.我的管道无法推断架构.如何提取/推断架构并将其附加到管道中的数据,以便我的下游进程(写入 BigQuery)可以推断架构?

I'm using Apache Beam's kafkaIO to read from a topic that has an avro schema in Confluent schema registry. I'm able to deserialize the message and write to files. But ultimately i want to write to BigQuery. My pipeline isn't able to infer the schema. How do I extract/infer the schema and attach it to the data in the pipeline so that my downstream processes (write to BigQuery) can infer the schema?

这是我使用模式注册表 url 设置解串器以及我从 Kafka 读取的代码:

Here is the code where I use the schema registry url to set the deserializer and where i read from Kafka:

    consumerConfig.put(
                        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, 
                        options.getSchemaRegistryUrl());

String schemaUrl = options.getSchemaRegistryUrl().get();
String subj = options.getSubject().get();

ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
            ConfluentSchemaRegistryDeserializerProvider.of(schemaUrl, subj);

pipeline
        .apply("Read from Kafka",
                KafkaIO
                        .<byte[], GenericRecord>read()
                        .withBootstrapServers(options.getKafkaBrokers().get())
                        .withTopics(Utils.getListFromString(options.getKafkaTopics()))
                        .withConsumerConfigUpdates(consumerConfig)
                        .withValueDeserializer(valDeserializerProvider)
                        .withKeyDeserializer(ByteArrayDeserializer.class)

                        .commitOffsetsInFinalize()
                        .withoutMetadata()

        );

我最初认为这足以让 Beam 推断出模式,但事实并非如此,因为 hasSchema() 返回 false.

I initially thought that this would be enough for beam to infer the schema, but it does not since hasSchema() returns false.

任何帮助将不胜感激.

推荐答案

正在进行中 支持推断 Avro 模式,存储在 Confluent 模式注册表中,KafkaIO.不过,现在也可以在用户管道代码中执行此操作.

There is ongoing work to support inferring of Avro schema, stored in Confluent Schema Registry, in KafkaIO. Though, it's possible to do now in user pipeline code as well.

这篇关于如何从 Apache Beam KafkaIO 中的 kafka 主题推断 avro 模式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆