星火:写入文件的Avro [英] Spark: Writing to Avro file

查看:213
本文介绍了星火:写入文件的Avro的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在星火,我有一个RDD从Avro的文件。我现在想做的事情上RDD一些转换和重新保存为一个文件的Avro:

I am in Spark, I have an RDD from an Avro file. I now want to do some transformations on that RDD and save it back as an Avro file:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
   .saveAsNewAPIHadoopFile(outputPath, 
  classOf[AvroKey[GenericRecord]], 
  classOf[org.apache.hadoop.io.NullWritable], 
  classOf[AvroKeyOutputFormat[GenericRecord]], 
  job.getConfiguration)

在运行此星火抱怨模式$ recordSchema不序列化。

When running this Spark complains that Schema$recordSchema is not serializable.

如果我取消了.MAP调用(,只是有rdd.saveAsNewAPIHadoopFile),呼叫成功。

If I uncomment the .map call (and just have rdd.saveAsNewAPIHadoopFile), the call succeeds.

我在做什么错在这里?

任何想法?

推荐答案

这里的问题是关系到作业中使用的avro.Schema类的非串行化。当您尝试从地图功能里面的code参考架构对象的异常。

The issue here is related to the non-serializability of the avro.Schema class used in the Job. The exception is thrown when you try to reference the schema object from the code inside the map function.

例如,如果你尝试做如下,您将获得的任务不序列化的异常:

For instance, if you try to do as follows, you will get the "Task not serializable" exception:

val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
  // reference to the schema object declared outside
  val record = new GenericData.Record(schema)
})

您可以让一切只需创建功能块内部的架构的一个新实例的工作:

You can make everything to work by just creating a new instance of the schema inside the function block:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
  val record = new GenericData.Record(innserSchema)
  ...
})

既然你不喜欢解析为每次处理记录的Avro架构,更好的解决方案将是在解析划分级别的架构。以下也可以工作:

Since you would not like parsing the avro schema for every record you handle, a better solution will be to parse the schema at partition level. The following also works:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))

  tuples.map(t => {
    val record = new GenericData.Record(innserSchema)
    ...
    // this closure will be bundled together with the outer one 
    // (no serialization issues)
  })
})

在code以上,只要你提供一个便携式参考jsonSchema文件,因为地图功能将是由多个远程执行者执行工作。它可以是在HDFS文件或它可以与在JAR应用程序一起被打包的引用(将使用的类加载器,以获取功能在后一种情况下,其内容)。

The code above works as long as you provide a portable reference to the jsonSchema file, since the map function is going to be executed by multiple remote executors. It can be a reference to a file in HDFS or it can be packaged along with the application in the JAR (you will use the class-loader functions to get its contents in the latter case).

对于那些谁尝试使用的Avro与星火,发现仍有一些尚未解决的编译问题,你必须使用Maven的POM以下导入:

For those who are trying to use Avro with Spark, notice that there are still some unresolved compilation problems and you have to use the following import on Maven POM:

<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-mapred</artifactId>
  <version>1.7.7</version>
  <classifier>hadoop2</classifier>
<dependency>

请注意在hadoop2分类。您可以在 https://issues.apache.org/jira/browse/SPARK跟踪问题-3039

Note the "hadoop2" classifier. You can track the issue at https://issues.apache.org/jira/browse/SPARK-3039.

这篇关于星火:写入文件的Avro的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆