Spark:无法为DataFrame上的UDF序列化任务 [英] Spark: Task not Serializable for UDF on DataFrame

查看:436
本文介绍了Spark:无法为DataFrame上的UDF序列化任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我尝试在Spark 1.4.1上执行以下操作时,得到org.apache.spark.SparkException: Task not serializable:

I get org.apache.spark.SparkException: Task not serializable when I try to execute the following on Spark 1.4.1:

import java.sql.{Date, Timestamp}
import java.text.SimpleDateFormat

object ConversionUtils {
  val iso8601 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")

  def tsUTC(s: String): Timestamp = new Timestamp(iso8601.parse(s).getTime)

  val castTS = udf[Timestamp, String](tsUTC _)
}

val df = frame.withColumn("ts", ConversionUtils.castTS(frame("ts_str")))
df.first

在这里,frame是生活在HiveContext中的DataFrame.该数据框没有任何问题.

Here, frame is a DataFrame that lives within a HiveContext. That data frame does not have any issues.

我对整数有类似的UDF,它们可以正常工作.但是,带有时间戳的代码似乎会引起问题.根据文档java.sql.TimeStamp实现了Serializable,因此那不是问题.对于SimpleDateFormat,也是如此,此处可以看到.

I have similar UDFs for integers and they work without any problem. However, the one with timestamps seems to cause problems. According to the documentation, java.sql.TimeStamp implements Serializable, so that's not the problem. The same is true for SimpleDateFormat as can be seen here.

这使我相信是UDF引起了问题.但是,我不确定如何修复它.

This causes me to believe it's the UDF that's causing problems. However, I'm not sure what and how to fix it.

跟踪的相关部分:

Caused by: java.io.NotSerializableException: ...
Serialization stack:
        - object not serializable (class: ..., value: ...$ConversionUtils$@63ed11dd)
        - field (class: ...$ConversionUtils$$anonfun$3, name: $outer, type: class ...$ConversionUtils$)
        - object (class ...$ConversionUtils$$anonfun$3, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, name: func$2, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2, <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUdf, name: f, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.ScalaUdf, scalaUDF(ts_str#2683))
        - field (class: org.apache.spark.sql.catalyst.expressions.Alias, name: child, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Alias, scalaUDF(ts_str#2683) AS ts#7146)
        - element of array (index: 35)
        - array (class [Ljava.lang.Object;, size 36)
        - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
        - object (class scala.collection.mutable.ArrayBuffer,

推荐答案

尝试:

object ConversionUtils extends Serializable {
  ...
}

这篇关于Spark:无法为DataFrame上的UDF序列化任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆