如何将嵌套的avro GenericRecord转换为Row [英] How to convert nested avro GenericRecord to Row
问题描述
我有一个代码将我的avro记录转换为Row,使用函数 avroToRowConverter()
I have a code to convert my avro record to Row using function avroToRowConverter()
directKafkaStream.foreachRDD(rdd -> {
JavaRDD<Row> newRDD= rdd.map(x->{
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));
return avroToRowConverter(recordInjection.invert(x._2).get());
});
此函数不适用于嵌套模式(TYPE = UNION)
。
This function is not working for nested schema (TYPE= UNION)
.
private static Row avroToRowConverter(GenericRecord avroRecord) {
if (null == avroRecord) {
return null;
}
//GenericData
Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
for (Schema.Field field : avroRecord.getSchema().getFields()) {
if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){
objectArray[field.pos()] = ""+avroRecord.get(field.pos());
}else {
objectArray[field.pos()] = avroRecord.get(field.pos());
}
}
return new GenericRowWithSchema(objectArray, structType);
}
任何人都可以建议如何将复杂架构转换为ROW?
Can anyone suggest how can I convert complex schema to ROW?
推荐答案
SchemaConverters.createConverterToSQL
但不幸的是它是私有的。
有PR公开,但它们从未被合并:
There is SchemaConverters.createConverterToSQL
but it is private unfortunately.
There are PRs to make it public, but they were never merged:
- https://github.com/databricks/spark-avro/pull/89
- https://github.com/databricks/spark-avro/pull/132
虽然我们使用了一种解决方法。
There's a workaround though that we used.
你可以通过在 com.databricks.spark.avro
包中创建一个类来公开它:
You can expose it by creating a class in com.databricks.spark.avro
package:
package com.databricks.spark.avro
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType
object MySchemaConversions {
def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}
然后你可以在你的代码中使用它:
Then you can use it in your code like this:
final DataType myAvroType = SchemaConverters.toSqlType(MyAvroRecord.getClassSchema()).dataType();
final Function1<GenericRecord, Row> myAvroRecordConverter =
MySchemaConversions.createConverterToSQL(MyAvroRecord.getClassSchema(), myAvroType);
Row[] convertAvroRecordsToRows(List<GenericRecord> records) {
return records.stream().map(myAvroRecordConverter::apply).toArray(Row[]::new);
}
对于一条记录,您可以这样称呼:
For one record you can just call it like this:
final Row row = myAvroRecordConverter.apply(record);
这篇关于如何将嵌套的avro GenericRecord转换为Row的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!