Scala和Spark UDF功能 [英] Scala and Spark UDF function

查看:585
本文介绍了Scala和Spark UDF功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我制作了一个简单的UDF,以从spark的temptabl中的时间字段转换或提取一些值.我注册了该函数,但是当我使用sql调用该函数时,它将引发NullPointerException.下面是我的功能和执行过程.我正在使用齐柏林飞艇.遗憾的是,这在昨天仍然有效,但是今天早上停止了工作.

I made a simple UDF to convert or extract some values from a time field in a temptabl in spark. I register the function but when I call the function using sql it throws a NullPointerException. Below is my function and process of executing it. I am using Zeppelin. Strangly this was working yesterday but it stopped working this morning.

功能

def convert( time:String ) : String = {
  val sdf = new java.text.SimpleDateFormat("HH:mm")
  val time1 = sdf.parse(time)
  return sdf.format(time1)
}

注册功能

sqlContext.udf.register("convert",convert _)

在不使用SQL的情况下测试函数-可以正常工作

Test the function without SQL -- This works

convert(12:12:12) -> returns 12:12

在Zeppelin中使用SQL测试功能失败.

Test the function with SQL in Zeppelin this FAILS.

%sql
select convert(time) from temptable limit 10

模板结构

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- serverip: string (nullable = true)
 |-- request: string (nullable = true)
 |-- resource: string (nullable = true)
 |-- protocol: integer (nullable = true)
 |-- sourceip: string (nullable = true)

我正在获取的部分堆栈跟踪.

Part of the stacktrace that I am getting.

java.lang.NullPointerException
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:643)
    at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionInfo(FunctionRegistry.java:652)
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:54)
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:376)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:44)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:44)

推荐答案

使用udf代替直接定义函数

Use udf instead of define a function directly

import org.apache.spark.sql.functions._

val convert = udf[String, String](time => {
        val sdf = new java.text.SimpleDateFormat("HH:mm")
        val time1 = sdf.parse(time)
        sdf.format(time1)
    }
)

udf的输入参数是Column(或多个Columns).返回类型为Column.

A udf's input parameter is Column(or Columns). And the return type is Column.

case class UserDefinedFunction protected[sql] (
    f: AnyRef,
    dataType: DataType,
    inputTypes: Option[Seq[DataType]]) {

  def apply(exprs: Column*): Column = {
    Column(ScalaUDF(f, dataType, exprs.map(_.expr), inputTypes.getOrElse(Nil)))
  }
}

这篇关于Scala和Spark UDF功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆