Spark Dataframe统计信息抛出任务不可序列化 [英] Spark Dataframe stat throwing Task not serializable
问题描述
我正在尝试计算Spark数据集/数据集的一些统计信息,该数据集是从包含有关2013年至2015年美国航班的 .parquet
文件的目录中读取的.更具体地说,m在 DataFrameStatFunction
中使用 approxQuantile
方法,可以通过在 Dataset
上调用 stat
方法进行访问.请参阅文档
I'm trying to calculate some stats for a dataframe/set in spark that is read from a directory with .parquet
files about US flights between 2013 and 2015. To be more specific, I'm using approxQuantile
method in DataFrameStatFunction
that can be accessed calling stat
method on a Dataset
. See docu
import airportCaseStudy.model.Flight
import org.apache.spark.sql.SparkSession
object CaseStudy {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder
.master("local[*]")
.getOrCreate
val sc = spark.sparkContext
sc.setLogLevel("ERROR")
import spark.sqlContext.implicits._
val flights = spark
.read
.parquet("C:\\Users\\Bluetab\\IdeaProjects\\GraphFramesSparkPlayground\\src\\resources\\flights")
.as[Flight]
flights.show()
flights.printSchema()
flights.describe("year", "flightEpochSeconds").show()
val approxQuantiles = flights.stat
.approxQuantile(Array("year", "flightEpochSeconds"), Array(0.25, 0.5, 0.75), 0.25)
// whatever...
}
}
航班
只是一个案例类.
package airportCaseStudy.model
case class Flight(year: Int, quarter: Int, month: Int, dayOfMonth: Int, dayOfWeek: Int, flightDate: String,
uniqueCarrier: String, airlineID: String, carrier: String, tailNum: String, flightNum: Int,
originAirportID: String, origin: String, originCityName: String, dstAirportID: String,
dst: String, dstCityName: String, taxiOut: Float, taxiIn: Float, cancelled: Boolean,
diverted: Float, actualETMinutes: Float, airTimeMinutes: Float, distanceMiles: Float, flightEpochSeconds: Long)
出什么问题了?
我正在使用Spark 2.4.0.
What's the issue?
I'm using Spark 2.4.0.
当执行 valroxQuantiles = flights.stat.approxQuantile(Array("year","flightEpochSeconds"),Array(0.25,0.5,0.75),0.25)
时,我没有完成因为必须有一个无法序列化的任务.我花了一些时间检查以下链接,但无法弄清楚为什么会出现此异常.
When executing val approxQuantiles = flights.stat.approxQuantile(Array("year", "flightEpochSeconds"), Array(0.25, 0.5, 0.75), 0.25)
I'm not getting it done because there must be such a task that cannot be serializable. I spent some time checking out there the following links, but I'm not able to figure out why this exception.
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2326)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$1(PairRDDFunctions.scala:88)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.combineByKeyWithClassTag(PairRDDFunctions.scala:77)
at org.apache.spark.rdd.PairRDDFunctions.$anonfun$foldByKey$1(PairRDDFunctions.scala:222)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.PairRDDFunctions.foldByKey(PairRDDFunctions.scala:211)
at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1158)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
at org.apache.spark.sql.execution.stat.StatFunctions$.multipleApproxQuantiles(StatFunctions.scala:102)
at org.apache.spark.sql.DataFrameStatFunctions.approxQuantile(DataFrameStatFunctions.scala:104)
at airportCaseStudy.CaseStudy$.main(CaseStudy.scala:27)
at airportCaseStudy.CaseStudy.main(CaseStudy.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
Serialization stack:
- object not serializable (class: scala.runtime.LazyRef, value: LazyRef thunk)
- element of array (index: 2)
- array (class [Ljava.lang.Object;, size 3)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function0.apply:()Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$foldByKey$2:(Lorg/apache/spark/rdd/PairRDDFunctions;[BLscala/runtime/LazyRef;)Ljava/lang/Object;, instantiatedMethodType=()Ljava/lang/Object;, numCaptured=3])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$2158/61210602, org.apache.spark.rdd.PairRDDFunctions$$Lambda$2158/61210602@165a5979)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 2)
- field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;)
- object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.rdd.PairRDDFunctions, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/rdd/PairRDDFunctions.$anonfun$foldByKey$3:(Lscala/Function0;Lscala/Function2;Ljava/lang/Object;)Ljava/lang/Object;, instantiatedMethodType=(Ljava/lang/Object;)Ljava/lang/Object;, numCaptured=2])
- writeReplace data (class: java.lang.invoke.SerializedLambda)
- object (class org.apache.spark.rdd.PairRDDFunctions$$Lambda$2159/758750856, org.apache.spark.rdd.PairRDDFunctions$$Lambda$2159/758750856@6a6e410c)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400)
... 22 more
感谢您能提供的任何帮助.
I appreciate any help you can provide.
推荐答案
为您的类或对象添加扩展序列化".
add "extends Serializable" to you class or object.
class/Object Test extends Serializable{
//type you code
}
这篇关于Spark Dataframe统计信息抛出任务不可序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!