Scala 和 Spark UDF 函数 [英] Scala and Spark UDF function
问题描述
我制作了一个简单的 UDF 来转换或从 spark 中临时表中的时间字段中提取一些值.我注册了该函数,但是当我使用 sql 调用该函数时,它会抛出 NullPointerException.下面是我的函数和执行过程.我正在使用齐柏林飞艇.奇怪的是,这是昨天工作,但今天早上停止工作.
I made a simple UDF to convert or extract some values from a time field in a temptabl in spark. I register the function but when I call the function using sql it throws a NullPointerException. Below is my function and process of executing it. I am using Zeppelin. Strangly this was working yesterday but it stopped working this morning.
功能
def convert( time:String ) : String = {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
return sdf.format(time1)
}
注册函数
sqlContext.udf.register("convert",convert _)
在没有 SQL 的情况下测试函数 -- 这有效
Test the function without SQL -- This works
convert(12:12:12) -> returns 12:12
在 Zeppelin 中使用 SQL 测试函数失败.
Test the function with SQL in Zeppelin this FAILS.
%sql
select convert(time) from temptable limit 10
temptable的结构
Structure of temptable
root
|-- date: string (nullable = true)
|-- time: string (nullable = true)
|-- serverip: string (nullable = true)
|-- request: string (nullable = true)
|-- resource: string (nullable = true)
|-- protocol: integer (nullable = true)
|-- sourceip: string (nullable = true)
我得到的堆栈跟踪的一部分.
Part of the stacktrace that I am getting.
java.lang.NullPointerException
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)
推荐答案
使用udf而不是直接定义一个函数
Use udf instead of define a function directly
import org.apache.spark.sql.functions._
val convert = udf[String, String](time => {
val sdf = new java.text.SimpleDateFormat("HH:mm")
val time1 = sdf.parse(time)
sdf.format(time1)
}
)
udf 的输入参数是列(或列).返回类型是列.
A udf's input parameter is Column(or Columns). And the return type is Column.
case class UserDefinedFunction protected[sql] (
f: AnyRef,
dataType: DataType,
inputTypes: Option[Seq[DataType]]) {
def apply(exprs: Column*): Column = {
Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
}
}
这篇关于Scala 和 Spark UDF 函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!