Spark Task无法通过滞后窗口功能进行序列化 [英] Spark Task not serializable with lag Window function

查看:147
本文介绍了Spark Task无法通过滞后窗口功能进行序列化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我注意到,如果我通过函数调用map(),则在DataFrame上使用Window函数之后,Spark返回无法序列化的任务"异常 这是我的代码:

I've noticed that after I use a Window function over a DataFrame if I call a map() with a function, Spark returns a "Task not serializable" Exception This is my code:

val hc:org.apache.spark.sql.hive.HiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def f():String = "test"
case class P(name:String,surname:String)
val lag_result:org.apache.spark.sql.Column = lag($"name",1).over(Window.partitionBy($"surname"))
val lista:List[P] = List(P("N1","S1"),P("N2","S2"),P("N2","S2"))
val data_frame:org.apache.spark.sql.DataFrame = hc.createDataFrame(sc.parallelize(lista))
df.withColumn("lag_result", lag_result).map(x => f)
//df.withColumn("lag_result", lag_result).map{case x => def f():String = "test";f}.collect // This works

这是堆栈跟踪:

org.apache.spark.SparkException:任务在以下位置不可序列化 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304) 在 org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) 在 org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) 在org.apache.spark.SparkContext.clean(SparkContext.scala:2055)处 org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:324)在 org.apache.spark.rdd.RDD $$ anonfun $ map $ 1.apply(RDD.scala:323)在... 及更多原因:java.io.NotSerializableException: org.apache.spark.sql.Column序列化堆栈: -无法序列化的对象(类:org.apache.spark.sql.Column,值:'lag(name,1,null)windowspecdefinition(surname,UnspecifiedFrame)) -栏位(类别:$ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $ iwC $ iwC $$ iwC $$ iwC $ iwC $$ iwC, 名称:lag_result,类型:类org.apache.spark.sql.Column)...和 更多

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:323) at ... and more Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: 'lag(name,1,null) windowspecdefinition(surname,UnspecifiedFrame)) - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: lag_result, type: class org.apache.spark.sql.Column) ... and more

推荐答案

lag返回不可序列化的o.a.s.sql.Column.同样的情况也适用于WindowSpec.在交互模式下,这些对象可以作为map的闭包的一部分包含:

lag returns o.a.s.sql.Column which is not serializable. Same thing applies to WindowSpec. In interactive mode these object may be included as a part of the closure for map:

scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
df: org.apache.spark.sql.DataFrame = [x: string, y: int]

scala> val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@307a0097

scala> val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)

scala> def f(x: Any) = x.toString
f: (x: Any)String

scala> df.select(lag_y).map(f _).first
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
    - object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@307a0097)

一个简单的解决方案是将两者都标记为瞬态:

A simple solution is to mark both as transient:

scala> @transient val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dda1470

scala> @transient val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)

scala> df.select(lag_y).map(f _).first
res1: String = [null]     

这篇关于Spark Task无法通过滞后窗口功能进行序列化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆