尝试写入Generic Record类型的rdd时出现Task Not Serializable异常 [英] Task Not Serializable exception when trying to write a rdd of type Generic Record
问题描述
val file = File.createTempFile("temp", ".avro")
val schema = new Schema.Parser().parse(st)
val datumWriter = new GenericDatumWriter[GenericData.Record](schema)
val dataFileWriter = new DataFileWriter[GenericData.Record](datumWriter)
dataFileWriter.create(schema , file)
rdd.foreach(r => {
dataFileWriter.append(r)
})
dataFileWriter.close()
我有一个类型为GenericData.Record
的DStream
,我正在尝试以Avro格式写入HDFS,但出现此Task Not Serializable
错误:
I have a DStream
of type GenericData.Record
which I am trying to write to HDFS in the Avro format but I'm getting this Task Not Serializable
error:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2062)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:217)
at KafkaCo$$anonfun$main$3.apply(KafkaCo.scala:210)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: org.apache.avro.file.DataFileWriter
Serialization stack:
- object not serializable (class: org.apache.avro.file.DataFileWriter, value: org.apache.avro.file.DataFileWriter@78f132d9)
- field (class: KafkaCo$$anonfun$main$3$$anonfun$apply$1, name: dataFileWriter$1, type: class org.apache.avro.file.DataFileWriter)
- object (class KafkaCo$$anonfun$main$3$$anonfun$apply$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
推荐答案
此处的关键点是DataFileWriter
是本地资源(绑定到本地文件),因此对其进行序列化是没有意义的.
The key point here is that the DataFileWriter
is a local resource (bound to a local file), so serializing it does not make sense.
使代码适合于执行mapPartitions
之类的方法也无济于事,因为这种与执行者绑定的方法将在执行者的本地文件系统上写入文件.
Adapting the code to do things like mapPartitions
will not help either, as such executor-bound approach will write files on the local filesystem of the executors.
我们需要使用支持Spark分布式特性的实现,例如, https://github.com/databricks/spark-avro
We need to use an implementation that supports the distributed nature of Spark, for example, https://github.com/databricks/spark-avro
使用该库:
考虑到以case class
表示的某种模式,我们可以这样做:
Given some schema represented by a case class
, we would do:
val structuredRDD = rdd.map(record => recordToSchema(record))
val df = structuredRDD.toDF()
df.write.avro(hdfs_path)
这篇关于尝试写入Generic Record类型的rdd时出现Task Not Serializable异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!