Spark:java.io.NotSerializableException:org.apache.avro.Schema $ RecordSchema [英] Spark: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

查看:172
本文介绍了Spark:java.io.NotSerializableException:org.apache.avro.Schema $ RecordSchema的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用以下代码创建 avro RDD .

I am creating avro RDD with following code.

 def convert2Avro(data : String ,schema : Schema)  : AvroKey[GenericRecord] = {
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }

并按如下所示创建 avro RDD .

 var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schema)))

执行时,我在上面的行中遇到异常

while executing, I am getting following exception in above line

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
        at org.apache.spark.rdd.RDD.map(RDD.scala:270)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

任何指针?

推荐答案

Schema.ReocrdSchema 类尚未实现 serializable .因此它无法通过网络传输.我们可以将模式转换为字符串并传递给方法,然后在方法内部重构模式对象.

Schema.ReocrdSchema class has not implemented serializable. So it could not transferred over the network. We can convert the schema to string and pass to method and inside the method reconstruct the schema object.

var schemaString = schema.toString
var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schemaString)))

在方法内部重构架构:

def convert2Avro(data : String ,schemaString : String)  : AvroKey[GenericRecord] = {
   var schema = parser.parse(schemaString)
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }

这篇关于Spark:java.io.NotSerializableException:org.apache.avro.Schema $ RecordSchema的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆