org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException [英] org.apache.spark.SparkException: Task not serializable Caused by: java.io.NotSerializableException
问题描述
我有两个Scala代码-MyMain.scala和MyFunction.scala,分别构建和构建的MyFunction jar将充当MyMain中的UDF.
I have two Scala codes - MyMain.scala and MyFunction.scala, built separately and built jar of MyFunction will act as UDF in MyMain.
MyFunction.scala基本上包含带有公共方法 public String myFunc(String val0,String val1)
的Java类.该项目使用SBT构建,并且build_jar编译输出存储为工件(仅所需的类,即MyFunction.class,而不是依赖项).
MyFunction.scala basically contains a Java class with a public method public String myFunc(String val0, String val1)
. The project is built in SBT and build_jar compile output is stored as artifact (only the required class i.e. MyFunction.class, not the dependencies).
MyMain.scala将上述工件jar导入lib目录下的lib文件夹中,并使用 build.sbt
中的 unmanagedBase:= baseDirectory.value/"lib"
添加到类路径中
MyMain.scala imports the above artifact jar into lib folder under and is added into the classpath using unmanagedBase := baseDirectory.value / "lib"
in build.sbt
因此MyMain.scala项目结构如下:
So MyMain.scala project structure is as follows:
MyMain
|
-lib/MyFunction.jar
|
- META-INF/MANIFEST.MF
- MyFunction.class
-project
-src/main/scala/MyMain.scala
-build.sbt
/我需要做什么/
我想在MyMain.scala中的MyFunction.jar内的MyFunction.class上定义UDF,该UDF已添加到lib类路径中.我已经定义了UDF,但是当我尝试在MyMain.scala内的Spark数据帧上使用UDF时,它会抛出无法序列化的任务" java.io.NotSerializableException,如下所示:
I want to define a UDF in MyMain.scala on the MyFunction.class within the MyFunction.jar which is added to the lib classpath. I have defined the UDF but when I am trying to use it on a Spark dataframe inside MyMain.scala, it is throwing "Task not serializable" java.io.NotSerializableException as below:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:616)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:339)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
at org.apache.spark.sql.Dataset.show(Dataset.scala:747)
at org.apache.spark.sql.Dataset.show(Dataset.scala:724)
at MyMain$.main(<pastie>:253)
... 58 elided
Caused by: java.io.NotSerializableException: MyMain$
Serialization stack:
- object not serializable (class: MyMain$, value: MyMain$@11f25cf)
- field (class: $iw, name: MyMain$module, type: class MyMain$)
- object (class $iw, $iw@540705e8)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7e6e1038)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7587f2a0)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5e00f263)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@3fbfe419)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5172e87b)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5ec96f75)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@26f6de78)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@18c3bc83)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@35d674ee)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5712092f)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6980c2e6)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6ce299e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@406b8acb)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@73d71e61)
- field (class: $line47.$read, name: $iw, type: class $iw)
- object (class $line47.$read, $line47.$read@72ee2f87)
- field (class: $iw, name: $line47$read, type: class $line47.$read)
- object (class $iw, $iw@22c4de5a)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@3daea539)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function2>)
- element of array (index: 9)
- array (class [Ljava.lang.Object;, size 15)
- field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, name: references$1, type: class [Ljava.lang.Object;)
- object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11, <function2>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 92 more
/是什么原因/
MyMain.scala指的是Spark数据帧上某些转换内的类的一些不可序列化的实例
MyMain.scala is referring to some non-serializable instance of class inside some transformation on the Spark dataframe
/我尝试过的/
object MyFunction extends Serializable {
val myFuncSingleton = new MyFunction()
def getMyFunc(var0:String,var1:String) : String = {
myFuncSingleton.myFunc(var0,var1)
}
}
import org.apache.spark.sql.functions.udf
val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })
object MyMain {
val spark = ...
val hadoopfs = ...
def main(args: Array[String]) : Unit = {
val df1 = ...
val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
}
}
请参阅以下链接如何在火花-udfs中实例化对象时解决不可序列化的错误
推荐答案
对代码进行了细微调整,它解决了我的问题.
Did minor tweak to the code and it resolved my issue.
尽管我不完全了解Scala编译器的内部工作原理以及它如何处理UDF,但我将尝试解释我的解决方案以及 Task无法序列化
错误的可能原因:
Though I don't fully understand the internal workings of the Scala compiler and how it handles UDF's, I will try explain my solution and what could have been the possible reason for Task not serializable
error:
- 在
withColumn(...)
中使用myUDF
变量不在任何RDD闭包内. - 在驱动程序外部的
udf(...)
定义中,在Scala对象MyFunction上调用getMyFunc(...)
等同于调用静态方法,因此不需要对MyFunction对象进行序列化,因为它用作单例对象,而不用作MyFunction
类(在MyFunction.jar中定义)的实例.这解释了MyFunction
定义从object MyFunction扩展了Serializable
到object MyFunction
的变化. - 但是,在包装"单例MyFunction对象中,将
myFuncSingleton
定义为MyFunction
类(在jar中)和myFuncSingleton.myFunc(的实例)...)
调用此实例的myFunc(...)
方法. - 但是,
myFuncSingleton
实例以及通过myUDF
在驱动程序中引用的MyFunction类在RDD闭包之外(如1.中所述),因此是MyFunction类需要显式序列化,即公共类MyFunction实现java.io.Serializable
(因为jar内置Java类) -
如1.中所述,由于
withColumn(...)
中的UDF调用不在RDD闭包中,因此需要对MyMain对象进行序列化以使UDF跨分区使用,即对象MyMain扩展了Serializable
- The use of
myUDF
variable withinwithColumn(...)
is not inside any RDD closure. - Inside the
udf(...)
definition outside the driver program, calling ofgetMyFunc(...)
on the Scala object MyFunction is equivalent of calling a static method and hence MyFunction object does not need to be serialized, as it is used as singleton object and not as an instance ofMyFunction
class (defined inside MyFunction.jar). This explains the change ofMyFunction
definition fromobject MyFunction extends Serializable
toobject MyFunction
. - However, inside the "wrapper" singleton MyFunction object,
myFuncSingleton
is defined as an instance ofMyFunction
class (in jar) andmyFuncSingleton.myFunc(...)
calls themyFunc(...)
method of this instance. - However, the
myFuncSingleton
instance and it's MyFunction class being referenced in driver program throughmyUDF
is outside the RDD closure (as mentioned in 1.) and hence the MyFunction class needs to be explicitly serialized i.e.public class MyFunction implements java.io.Serializable
(since jar built-on Java class) As mentioned in 1., since UDF call within
withColumn(...)
is not within RDD closure, MyMain object needs to be serialized to make the UDF available across partitions i.e.object MyMain extends Serializable
object MyFunction {
val myFuncSingleton = new MyFunction()
def getMyFunc(var0:String,var1:String) : String = {
myFuncSingleton.myFunc(var0,var1)
}
}
import org.apache.spark.sql.functions.udf
val myUDF = udf((val0: String, val1: String) => { MyFunction.getMyFunc(val0, val1) })
object MyMain extends Serializable {
val spark = ...
val hadoopfs = ...
def main(args: Array[String]) : Unit = {
val df1 = ...
val df2 = df1.withColumn("reg_id", myUDF(lit("Subscriber"), col("id")))
}
}
注意:
- 总而言之,我正在通过MyFunction单例对象的静态方法调用来调用MyFunction实例方法.因此,
val myFuncVar = new MyFunction()
应该比val myFuncSingleton = new MyFunction()
更合适. - 我不完全了解RDD闭包的细微差别,也不确定withColumn()是否在RDD闭包之外,但出于解释目的而假定.
在此处有一些很好的解释: Spark如何处理对象
Got some very good explanation here: How Spark handles object
这篇关于org.apache.spark.SparkException:任务不可序列化原因:java.io.NotSerializableException的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!