Avro Schema 激发 StructType [英] Avro Schema to spark StructType
问题描述
这实际上与我的上一个问题相同,但使用Avro 而不是 JSON 作为数据格式.
This is effectively the same as my previous question, but using Avro rather than JSON as the data format.
我正在使用一个 Spark 数据框,它可以从几个不同的架构版本之一加载数据:
I'm working with a Spark dataframe which could be loading data from one of a few different schema versions:
// Version One
{"namespace": "com.example.avro",
"type": "record",
"name": "MeObject",
"fields": [
{"name": "A", "type": ["null", "int"], "default": null}
]
}
// Version Two
{"namespace": "com.example.avro",
"type": "record",
"name": "MeObject",
"fields": [
{"name": "A", "type": ["null", "int"], "default": null},
{"name": "B", "type": ["null", "int"], "default": null}
]
}
我正在使用 Spark Avro 加载数据.
I'm using Spark Avro to load the data.
DataFrame df = context.read()
.format("com.databricks.spark.avro")
.load("path/to/avro/file");
可能是版本一文件或版本二文件.但是,我希望能够以相同的方式处理它,并将未知值设置为null".我之前问题中的建议是设置架构,但是我不想重复自己在 .avro
文件和 sparks StructType
和朋友中编写架构.如何将 avro 架构(文本文件或生成的 MeObject.getClassSchema()
)转换为 sparks StructType
?
which may be a Version One file or Version Two file. However I'd like to be able to process it in an identical manner, with the unknown values set to "null". The recommendation in my previous question was to set the schema, however I do not want to repeat myself writing the schema in both a .avro
file and as sparks StructType
and friends. How can I convert the avro schema (either text file or the generated MeObject.getClassSchema()
) into sparks StructType
?
Spark Avro 有一个 SchemaConverters
,但它都是私有的,并返回一些奇怪的内部对象.
Spark Avro has a SchemaConverters
, but it is all private and returns some strange internal object.
推荐答案
免责声明:这是一种肮脏的黑客攻击.这取决于以下几点:
Disclaimer: It's kind of a dirty hack. It depends on a few things:
- Python 提供了一个轻量级 Avro 处理库,并且由于其动态性,它不会不需要打字机
- 一个空的 Avro 文件仍然是一个有效的文件
- Spark 模式可以与 JSON 相互转换
- Python provides a lightweight Avro processing library and due to its dynamism it doesn't require typed writers
- an empty Avro file is still a valid document
- Spark schema can be converted to and from JSON
以下代码读取一个 Avro 模式文件,使用给定的模式创建一个空的 Avro 文件,使用 spark-csv
读取它并将 Spark 模式输出为 JSON 文件.
Following code reads an Avro schema file, creates an empty Avro file with given schema, reads it using spark-csv
and outputs Spark schema as a JSON file.
import argparse
import tempfile
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
from pyspark import SparkContext
from pyspark.sql import SQLContext
def parse_schema(schema):
with open(schema) as fr:
return avro.schema.parse(open(schema).read())
def write_dummy(schema):
tmp = tempfile.mktemp(suffix='.avro')
with open(tmp, "w") as fw:
writer = DataFileWriter(fw, DatumWriter(), schema)
writer.close()
return tmp
def write_spark_schema(path, schema):
with open(path, 'w') as fw:
fw.write(schema.json())
def main():
parser = argparse.ArgumentParser(description='Avro schema converter')
parser.add_argument('--schema')
parser.add_argument('--output')
args = parser.parse_args()
sc = SparkContext('local[1]', 'Avro schema converter')
sqlContext = SQLContext(sc)
df = (sqlContext.read.format('com.databricks.spark.avro')
.load(write_dummy(parse_schema(args.schema))))
write_spark_schema(args.output, df.schema)
sc.stop()
if __name__ == '__main__':
main()
用法:
bin/spark-submit --packages com.databricks:spark-avro_2.10:2.0.1 \
avro_to_spark_schema.py \
--schema path_to_avro_schema.avsc \
--output path_to_spark_schema.json
读取架构:
import scala.io.Source
import org.apache.spark.sql.types.{DataType, StructType}
val json: String = Source.fromFile("schema.json").getLines.toList.head
val schema: StructType = DataType.fromJson(json).asInstanceOf[StructType]
这篇关于Avro Schema 激发 StructType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!