在Spark-SQL for Azure数据库中创建用户定义的(非临时)函数 [英] Creating User Defined (not temporary) Function in Spark-SQL for Azure Databricks

查看:16
本文介绍了在Spark-SQL for Azure数据库中创建用户定义的(非临时)函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

也许这很愚蠢,我是一名Microsoft SQL/C#开发人员,以前从未真正使用过任何其他IDE/编写的Java/Scala。 我正在将一些Azure SQL查询迁移到Azure数据库解决方案。

似乎没有对应的TSQL DATEDIFF_BIG函数(https://docs.microsoft.com/en-us/sql/t-sql/functions/datediff-transact-sql?view=sql-server-2017)

您找到的解决方案是-编写您自己的UDF。

我已经在Scala Notebook中这样做了(见下文)-它对于临时函数很好地工作。(https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-function.html)

这是我找到的最有帮助的示例https://github.com/johnmuller87/spark-udf

有相当多的临时函数示例,但我没有找到针对非Java/Scala开发人员的永久函数。

我安装了SBT(Windows的最新版本-https://www.scala-sbt.org/1.x/docs/Installing-sbt-on-Windows.html) 我还安装了Intelj

我运行了为IBAN示例构建的SBT,但在将JAR上载到我的Clusterd并进行函数注册后,我无法获得SQL函数。

CREATE FUNCTION ValidateIBAN AS 'com.ing.wbaa.spark.udf.ValidateIBAN' USING JAR 'spark_udf_assembly_0_2_0' --without extension

SELECT ValidateIBAN('NL20INGB0001234567')

错误始终是 "错误在SQL语句中:分析异常:没有UDF/UDAF/UDTF‘com.ing.wbaa.park.udf.ValiateIBAN’的处理程序;第1行pos 7"

//import org.apache.spark.sql.types._                         // include the Spark Types to define our schema
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.functions.udf
import java.time.temporal.ChronoUnit;

// Define function to calculate local time offset
def getTimestampDifference(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

  //https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
  //https://spark.apache.org/docs/2.4.0/sql-reference.html
  //https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

  interval match
  {
    case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
    case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
    case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
    case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
    case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
    case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
    case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
    case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
    case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
    case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
  }
}

spark.udf.register("DATETIMEDIFF", udf(getTimestampDifference(_:java.lang.String, _:java.sql.Timestamp,_:java.sql.Timestamp),LongType))
我实际上需要的是-如何将Scala笔记本转换为一个SQL函数,以便我可以在 Azure数据库集群5.4版(包括ApacheSpark 2.4.3、Scala 2.11)

  • 实现什么类
  • 实现什么方法(C#中的重写)-关于配置单元或Spark也有不同的文章
  • 如何设置构建的SBT或以任何其他方式在Java归档中编译它,以便我可以成功地创建和运行SQL函数(仅在SQL中创建和运行,而不是在PYHTON代码中,也不在Scala代码中-在SQL笔记本中)

感谢您的帮助

推荐答案

您引用的数据库中的CREATE Function语句实际上是配置单元命令,而不是Spark,它要求UDF类是配置单元UDF。

这也是出现"No Handler for UDF/UDAF/UDTF"错误的原因。您链接的示例实现了Spark UDF,而您需要的是实现配置单元UDF

要创建配置单元UDF,您需要实现一个扩展类org.apache.hadoop.hive.ql.exec.UDF的类,并实现一个名为EVALUATE的函数。在您的示例中,整个类应该如下所示:

class GetTimestampDifference extends UDF {

  def evaluate(interval: java.lang.String, date1: java.sql.Timestamp, date2: java.sql.Timestamp) : java.lang.Long = {

  //https://docs.oracle.com/javase/8/docs/api/java/sql/Timestamp.html
  //https://spark.apache.org/docs/2.4.0/sql-reference.html
  //https://alvinalexander.com/scala/how-to-use-scala-match-expression-like-switch-case-statement

  interval match
  {
    case "NANOSECOND"=> return ChronoUnit.NANOS.between(date1.toInstant(), date2.toInstant());
    case "MICROSECOND"=> return ChronoUnit.MICROS.between(date1.toInstant(), date2.toInstant());
    case "MILLISECOND"=> return ChronoUnit.MILLIS.between(date1.toInstant(), date2.toInstant()); // date2.getTime() - date1.getTime();
    case "SECOND"=> return ChronoUnit.SECONDS.between(date1.toInstant(), date2.toInstant());
    case "MINUTE"=> return ChronoUnit.MINUTES.between(date1.toInstant(), date2.toInstant());
    case "HOUR"=> return ChronoUnit.HOURS.between(date1.toInstant(), date2.toInstant());
    case "DAY"=> return ChronoUnit.DAYS.between(date1.toInstant(), date2.toInstant());
    case "WEEK"=> return ChronoUnit.WEEKS.between(date1.toInstant(), date2.toInstant());
    case "MONTH"=> return ChronoUnit.MONTHS.between(date1.toInstant(), date2.toInstant());
    case "YEAR"=> return ChronoUnit.YEARS.between(date1.toInstant(), date2.toInstant());
  }
}

}

然后需要将其编译为JAR文件,将其复制到Databricks文件系统中的某个位置,并使用与前面相同的命令创建永久函数(假设您保留了IBAN示例的命名空间):

CREATE FUNCTION GetTimestampDifference AS 'com.ing.wbaa.spark.udf.GetTimestampDifference' USING JAR '[path to your jar in dbfs]'

SELECT GetTimestampDifference ("MILLISECOND",cast("2019-07-08 16:07:03.246" as timestamp), cast("2019-07-08 16:07:03.248" as timestamp))

假设您仍在修改开始时使用的IBAN示例项目,为了创建JAR文件,您必须将以下包依赖项添加到Build.sbt文件:

"org.apache.spark" %% "spark-hive" % "2.4.3"

这篇关于在Spark-SQL for Azure数据库中创建用户定义的(非临时)函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆