Spark:无法为DataFrame上的UDF序列化任务 [英] Spark: Task not Serializable for UDF on DataFrame
问题描述
当我尝试在Spark 1.4.1上执行以下操作时,得到org.apache.spark.SparkException: Task not serializable
:
I get org.apache.spark.SparkException: Task not serializable
when I try to execute the following on Spark 1.4.1:
import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat
object ConversionUtils {
val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")
def tsUTC(s: String): Timestamp = new Timestamp(iso8601.parse(s).getTime)
val castTS = udf[Timestamp, String](tsUTC _)
}
val df = frame.withColumn("ts", ConversionUtils.castTS(frame("ts_str")))
df.first
在这里,frame
是生活在HiveContext
中的DataFrame
.该数据框没有任何问题.
Here, frame
is a DataFrame
that lives within a HiveContext
. That data frame does not have any issues.
我对整数有类似的UDF,它们可以正常工作.但是,带有时间戳的代码似乎会引起问题.根据文档,java.sql.TimeStamp
实现了Serializable
,因此那不是问题.对于SimpleDateFormat
,也是如此,此处可以看到.
I have similar UDFs for integers and they work without any problem. However, the one with timestamps seems to cause problems. According to the documentation, java.sql.TimeStamp
implements Serializable
, so that's not the problem. The same is true for SimpleDateFormat
as can be seen here.
这使我相信是UDF引起了问题.但是,我不确定如何修复它.
This causes me to believe it's the UDF that's causing problems. However, I'm not sure what and how to fix it.
跟踪的相关部分:
Caused by: java.io.NotSerializableException: ...
Serialization stack:
- object not serializable (class: ..., value: ...$ConversionUtils$@63ed11dd)
- field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$)
- object (class ...$ConversionUtils$$anonfun$3, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>)
- field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1)
- object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683))
- field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
- object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146)
- element of array (index: 35)
- array (class [Ljava.lang.Object;, size 36)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer,
推荐答案
尝试:
object ConversionUtils extends Serializable {
...
}
这篇关于Spark:无法为DataFrame上的UDF序列化任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!