org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException [英] org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException

查看:77
本文介绍了org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个Scala代码-MyMain.scala和MyFunction.scala,分别构建和构建的MyFunction jar将充当MyMain中的UDF.

I have two Scala codes - MyMain.scala and MyFunction.scala, built separately and built jar of MyFunction will act as UDF in MyMain.

MyFunction.scala基本上包含带有公共方法 public String myFunc(String val0,String val1)的Java类.该项目使用SBT构建,并且build_jar编译输出存储为工件(仅所需的类,即MyFunction.class,而不是依赖项).

MyFunction.scala basically contains a Java class with a public method public String myFunc(String val0, String val1). The project is built in SBT and build_jar compile output is stored as artifact (only the required class i.e. MyFunction.class, not the dependencies).

MyMain.scala将上述工件jar导入lib目录下的lib文件夹中,并使用 build.sbt 中的 unmanagedBase:= baseDirectory.value/"lib" 添加到类路径中

MyMain.scala imports the above artifact jar into lib folder under and is added into the classpath using unmanagedBase := baseDirectory.value / "lib" in build.sbt

因此MyMain.scala项目结构如下:

So MyMain.scala project structure is as follows:

MyMain
| 
-lib/MyFunction.jar
       |
       - META-INF/MANIFEST.MF
       - MyFunction.class
-project
-src/main/scala/MyMain.scala
-build.sbt

/我需要做什么/

我想在MyMain.scala中的MyFunction.jar内的MyFunction.class上定义UDF,该UDF已添加到lib类路径中.我已经定义了UDF,但是当我尝试在MyMain.scala内的Spark数据帧上使用UDF时,它会抛出无法序列化的任务" java.io.NotSerializableException,如下所示:

I want to define a UDF in MyMain.scala on the MyFunction.class within the MyFunction.jar which is added to the lib classpath. I have defined the UDF but when I am trying to use it on a Spark dataframe inside MyMain.scala, it is throwing "Task not serializable" java.io.NotSerializableException as below:

org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
  at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
  at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
  at MyMain$.main(<pastie>:253)
  ... 58 elided
Caused by: java.io.NotSerializableException: MyMain$
Serialization stack:
    - object not serializable (class: MyMain$, value: MyMain$@11f25cf)
    - field (class: $iw, name: MyMain$module, type: class MyMain$)
    - object (class $iw, $iw@540705e8)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7e6e1038)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@7587f2a0)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5e00f263)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@3fbfe419)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5172e87b)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5ec96f75)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@26f6de78)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@18c3bc83)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@35d674ee)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@5712092f)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6980c2e6)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@6ce299e)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@406b8acb)
    - field (class: $iw, name: $iw, type: class $iw)
    - object (class $iw, $iw@73d71e61)
    - field (class: $line47.$read, name: $iw, type: class $iw)
    - object (class $line47.$read, $line47.$read@72ee2f87)
    - field (class: $iw, name: $line47$read, type: class $line47.$read)
    - object (class $iw, $iw@22c4de5a)
    - field (class: $iw, name: $outer, type: class $iw)
    - object (class $iw, $iw@3daea539)
    - field (class: $anonfun$1, name: $outer, type: class $iw)
    - object (class $anonfun$1, <function2>)
    - element of array (index: 9)
    - array (class [Ljava.lang.Object;, size 15)
    - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
    - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
  at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
  ... 92 more

/是什么原因/

MyMain.scala指的是Spark数据帧上某些转换内的类的一些不可序列化的实例

MyMain.scala is referring to some non-serializable instance of class inside some transformation on the Spark dataframe

/我尝试过的/

object MyFunction extends Serializable {
  val myFuncSingleton = new MyFunction()
  def getMyFunc(var0:String,var1:String) : String = {
    myFuncSingleton.myFunc(var0,var1)
  }
}

import org.apache.spark.sql.functions.udf
val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })

object MyMain {
  val spark = ...
  val hadoopfs = ...
  def main(args: Array[String]) : Unit = {
    val df1 = ...
    val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
  }
}

请参阅以下链接如何在火花-udfs中实例化对象时解决不可序列化的错误

推荐答案

对代码进行了细微调整,它解决了我的问题.

Did minor tweak to the code and it resolved my issue.

尽管我不完全了解Scala编译器的内部工作原理以及它如何处理UDF,但我将尝试解释我的解决方案以及 Task无法序列化错误的可能原因:

Though I don't fully understand the internal workings of the Scala compiler and how it handles UDF's, I will try explain my solution and what could have been the possible reason for Task not serializable error:

  1. withColumn(...)中使用 myUDF 变量不在任何RDD闭包内.
  2. 在驱动程序外部的 udf(...)定义中,在Scala对象MyFunction上调用 getMyFunc(...)等同于调用静态方法,因此不需要对MyFunction对象进行序列化,因为它用作单例对象,而不用作 MyFunction 类(在MyFunction.jar中定义)的实例.这解释了 MyFunction 定义从 object MyFunction扩展了Serializable object MyFunction 的变化.
  3. 但是,在包装"单例MyFunction对象中,将 myFuncSingleton 定义为 MyFunction 类(在jar中)和 myFuncSingleton.myFunc(的实例)...)调用此实例的 myFunc(...)方法.
  4. 但是, myFuncSingleton 实例以及通过 myUDF 在驱动程序中引用的MyFunction类在RDD闭包之外(如1.中所述),因此是MyFunction类需要显式序列化,即公共类MyFunction实现java.io.Serializable (因为jar内置Java类)
  5. 如1.中所述,由于 withColumn(...)中的UDF调用不在RDD闭包中,因此需要对MyMain对象进行序列化以使UDF跨分区使用,即对象MyMain扩展了Serializable

  1. The use of myUDF variable within withColumn(...) is not inside any RDD closure.
  2. Inside the udf(...) definition outside the driver program, calling of getMyFunc(...) on the Scala object MyFunction is equivalent of calling a static method and hence MyFunction object does not need to be serialized, as it is used as singleton object and not as an instance of MyFunction class (defined inside MyFunction.jar). This explains the change of MyFunction definition from object MyFunction extends Serializable to object MyFunction.
  3. However, inside the "wrapper" singleton MyFunction object, myFuncSingleton is defined as an instance of MyFunction class (in jar) and myFuncSingleton.myFunc(...) calls the myFunc(...) method of this instance.
  4. However, the myFuncSingleton instance and it's MyFunction class being referenced in driver program through myUDF is outside the RDD closure (as mentioned in 1.) and hence the MyFunction class needs to be explicitly serialized i.e. public class MyFunction implements java.io.Serializable (since jar built-on Java class)
  5. As mentioned in 1., since UDF call within withColumn(...) is not within RDD closure, MyMain object needs to be serialized to make the UDF available across partitions i.e. object MyMain extends Serializable

object MyFunction {
  val myFuncSingleton = new MyFunction()
  def getMyFunc(var0:String,var1:String) : String = {
    myFuncSingleton.myFunc(var0,var1)
  }
}

import org.apache.spark.sql.functions.udf
val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })

object MyMain extends Serializable {
  val spark = ...
  val hadoopfs = ...
  def main(args: Array[String]) : Unit = {
    val df1 = ...
    val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
  }
}

注意:

  • 总而言之,我正在通过MyFunction单例对象的静态方法调用来调用MyFunction实例方法.因此, val myFuncVar = new MyFunction()应该比 val myFuncSingleton = new MyFunction()更合适.
  • 我不完全了解RDD闭包的细微差别,也不确定withColumn()是否在RDD闭包之外,但出于解释目的而假定.

在此处有一些很好的解释: Spark如何处理对象

Got some very good explanation here: How Spark handles object

这篇关于org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆