星火:写入文件的Avro [英] Spark: Writing to Avro file
问题描述
我在星火,我有一个RDD从Avro的文件。我现在想做的事情上RDD一些转换和重新保存为一个文件的Avro:
I am in Spark, I have an RDD from an Avro file. I now want to do some transformations on that RDD and save it back as an Avro file:
val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))
rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
.saveAsNewAPIHadoopFile(outputPath,
classOf[AvroKey[GenericRecord]],
classOf[org.apache.hadoop.io.NullWritable],
classOf[AvroKeyOutputFormat[GenericRecord]],
job.getConfiguration)
在运行此星火抱怨模式$ recordSchema不序列化。
When running this Spark complains that Schema$recordSchema is not serializable.
如果我取消了.MAP调用(,只是有rdd.saveAsNewAPIHadoopFile),呼叫成功。
If I uncomment the .map call (and just have rdd.saveAsNewAPIHadoopFile), the call succeeds.
我在做什么错在这里?
任何想法?
推荐答案
这里的问题是关系到作业中使用的avro.Schema类的非串行化。当您尝试从地图功能里面的code参考架构对象的异常。
The issue here is related to the non-serializability of the avro.Schema class used in the Job. The exception is thrown when you try to reference the schema object from the code inside the map function.
例如,如果你尝试做如下,您将获得的任务不序列化的异常:
For instance, if you try to do as follows, you will get the "Task not serializable" exception:
val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
// reference to the schema object declared outside
val record = new GenericData.Record(schema)
})
您可以让一切只需创建功能块内部的架构的一个新实例的工作:
You can make everything to work by just creating a new instance of the schema inside the function block:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
val record = new GenericData.Record(innserSchema)
...
})
既然你不喜欢解析为每次处理记录的Avro架构,更好的解决方案将是在解析划分级别的架构。以下也可以工作:
Since you would not like parsing the avro schema for every record you handle, a better solution will be to parse the schema at partition level. The following also works:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
tuples.map(t => {
val record = new GenericData.Record(innserSchema)
...
// this closure will be bundled together with the outer one
// (no serialization issues)
})
})
在code以上,只要你提供一个便携式参考jsonSchema文件,因为地图功能将是由多个远程执行者执行工作。它可以是在HDFS文件或它可以与在JAR应用程序一起被打包的引用(将使用的类加载器,以获取功能在后一种情况下,其内容)。
The code above works as long as you provide a portable reference to the jsonSchema file, since the map function is going to be executed by multiple remote executors. It can be a reference to a file in HDFS or it can be packaged along with the application in the JAR (you will use the class-loader functions to get its contents in the latter case).
对于那些谁尝试使用的Avro与星火,发现仍有一些尚未解决的编译问题,你必须使用Maven的POM以下导入:
For those who are trying to use Avro with Spark, notice that there are still some unresolved compilation problems and you have to use the following import on Maven POM:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<dependency>
请注意在hadoop2
分类。您可以在 https://issues.apache.org/jira/browse/SPARK跟踪问题-3039 。
Note the "hadoop2"
classifier. You can track the issue at https://issues.apache.org/jira/browse/SPARK-3039.
这篇关于星火:写入文件的Avro的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!