Spark:java.io.NotSerializableException:org.apache.avro.Schema $ RecordSchema [英] Spark: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
本文介绍了Spark:java.io.NotSerializableException:org.apache.avro.Schema $ RecordSchema的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在使用以下代码创建 avro
RDD
.
I am creating avro
RDD
with following code.
def convert2Avro(data : String ,schema : Schema) : AvroKey[GenericRecord] = {
var wrapper = new AvroKey[GenericRecord]()
var record = new GenericData.Record(schema)
record.put("empname","John")
wrapper.datum(record)
return wrapper
}
并按如下所示创建 avro RDD
.
var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schema)))
执行时,我在上面的行中遇到异常
while executing, I am getting following exception in above line
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
任何指针?
推荐答案
Schema.ReocrdSchema
类尚未实现 serializable
.因此它无法通过网络传输.我们可以将模式转换为字符串并传递给方法,然后在方法内部重构模式对象.
Schema.ReocrdSchema
class has not implemented serializable
. So it could not transferred over the network. We can convert the schema to string and pass to method and inside the method reconstruct the schema object.
var schemaString = schema.toString
var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schemaString)))
在方法内部重构架构:
def convert2Avro(data : String ,schemaString : String) : AvroKey[GenericRecord] = {
var schema = parser.parse(schemaString)
var wrapper = new AvroKey[GenericRecord]()
var record = new GenericData.Record(schema)
record.put("empname","John")
wrapper.datum(record)
return wrapper
}
这篇关于Spark:java.io.NotSerializableException:org.apache.avro.Schema $ RecordSchema的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文