在Spark Scala中使用自定义数据框类时无法序列化任务 [英] Task not serializable while using custom dataframe class in Spark Scala

查看:99
本文介绍了在Spark Scala中使用自定义数据框类时无法序列化任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Scala/Spark(1.5)和Zeppelin面临一个奇怪的问题:

I am facing a strange issue with Scala/Spark (1.5) and Zeppelin:

如果我运行以下Scala/Spark代码,它将正常运行:

If I run the following Scala/Spark code, it will run properly:

// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")

rdd.map{a => 
    val aa = testList(0)
    None}

但是,在按照声明的方式声明自定义数据框类型后

However after declaring a custom dataframe type as proposed here

//DATAFRAME EXTENSION
import org.apache.spark.sql.DataFrame

object ExtraDataFrameOperations {
  implicit class DFWithExtraOperations(df : DataFrame) {

    //drop several columns
    def drop(colToDrop:Seq[String]):DataFrame = {
        var df_temp = df
        colToDrop.foreach{ case (f: String) =>
            df_temp = df_temp.drop(f)//can be improved with Spark 2.0
        }
        df_temp
    }   
  }
}

并使用它作为示例,如下所示:

and using it for example like following:

//READ ALL THE FILES INTO different DF and save into map
import ExtraDataFrameOperations._
val filename = "myInput.csv"

val delimiter =  ","

val colToIgnore = Seq("c_9", "c_10")

val inputICFfolder = "hdfs:///group/project/TestSpark/"

val df = sqlContext.read
            .format("com.databricks.spark.csv")
            .option("header", "true") // Use first line of all files as header
            .option("inferSchema", "false") // Automatically infer data types? => no cause we need to merge all df, with potential null values => keep string only
            .option("delimiter", delimiter)
            .option("charset", "UTF-8")
            .load(inputICFfolder + filename)
            .drop(colToIgnore)//call the customize dataframe

运行成功.

现在,如果我再次运行以下代码(与上面相同)

Now if I run again the following code (same as above)

// TEST NO PROBLEM SERIALIZATION
val rdd = sc.parallelize(Seq(1, 2, 3))
val testList = List[String]("a", "b")
rdd.map{a => 
    val aa = testList(0)
    None}

我收到错误消息:

rdd:org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [8]位于 在:32 testList处并行化:List [String] = List(a,b) org.apache.spark.SparkException:无法在以下位置序列化任务 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) 在org.apache.spark.SparkContext.clean(SparkContext.scala:2032)处 org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:314) ... 引起原因:java.io.NotSerializableException: $ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $ 序列化堆栈:-无法序列化的对象(类: $ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $, 价值: $ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $ iwC $$ iwC $ ExtraDataFrameOperations $ @ 6c7e70e) -栏位(类别:$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC,名称:ExtraDataFrameOperations $ module,类型:class $ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ ExtraDataFrameOperations $) -对象(类别$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $ iwC $$ iwC,$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC @ 4c6d0802)-字段(类: $ iwC $$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $$ iwC,名称:$ iw,类型:class $ iwC $$ iwC $$ iwC $ iwC $ iwC $$ iwC $$ iwC $ iwC $$ iwC) ...

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :32 testList: List[String] = List(a, b) org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:314) ... Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$ Serialization stack: - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$, value: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$@6c7e70e) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: ExtraDataFrameOperations$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$ExtraDataFrameOperations$) - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@4c6d0802) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC) ...

我不明白:

  • 为什么在未对数据帧执行任何操作时发生此错误?
  • 为什么以前成功使用过"ExtraDataFrameOperations"时无法序列化?

更新:

尝试

@inline val testList = List[String]("a", "b")

没有帮助.

推荐答案

只需添加'extends Serializable' 这对我有用

Just add 'extends Serializable' This work for me

/**
   * A wrapper around ProducerRecord RDD that allows to save RDD to Kafka.
   *
   * KafkaProducer is shared within all threads in one executor.
   * Error handling strategy - remember "last" seen exception and rethrow it to allow task fail.
   */
 implicit class DatasetKafkaSink(ds: Dataset[ProducerRecord[String, GenericRecord]]) extends Serializable {

   class ExceptionRegisteringCallback extends Callback {
     private[this] val lastRegisteredException = new AtomicReference[Option[Exception]](None)

     override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
       Option(exception) match {
         case a @ Some(_) => lastRegisteredException.set(a) // (re)-register exception if send failed
         case _ => // do nothing if encountered successful send
       }
     }

     def rethrowException(): Unit = lastRegisteredException.getAndSet(None).foreach(e => throw e)
   }

   /**
     * Save to Kafka reusing KafkaProducer from singleton holder.
     * Returns back control only once all records were actually sent to Kafka, in case of error rethrows "last" seen
     * exception in the same thread to allow Spark task to fail
     */
   def saveToKafka(kafkaProducerConfigs: Map[String, AnyRef]): Unit = {
     ds.foreachPartition { records =>
       val callback = new ExceptionRegisteringCallback
       val producer = KafkaProducerHolder.getInstance(kafkaProducerConfigs)

       records.foreach(record => producer.send(record, callback))

       producer.flush()
       callback.rethrowException()
     }
   }
 }'

这篇关于在Spark Scala中使用自定义数据框类时无法序列化任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆