如何将额外的参数传递给 Spark SQL 中的 UDF? [英] How can I pass extra parameters to UDFs in Spark SQL?

查看:28
本文介绍了如何将额外的参数传递给 Spark SQL 中的 UDF?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想解析 DataFrame 中的日期列,对于每个日期列,日期的分辨率可能会改变(即 2011/01/10 => 2011/01 如果分辨率是设置为月").

I want to parse the date columns in a DataFrame, and for each date column, the resolution for the date may change (i.e. 2011/01/10 => 2011 /01 if the resolution is set to "Month").

我写了以下代码:

def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
  import org.apache.spark.sql.functions._
  val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
  val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}

  val allColNames = dataframe.columns
  val allCols = allColNames.map(name => dataframe.col(name))

  val mappedCols =
  {
    for(i <- allCols.indices) yield
    {
      schema(i) match
      {
        case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
        case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
        case _ => allCols(i)
      }
    }
  }

  dataframe.select(mappedCols:_*)

}}

然而它不起作用.似乎我只能将 Columns 传递给 UDF.我想知道如果我将 DataFrame 转换为 RDD 并在每一行上应用该函数会不会很慢.

However it doesn't work. It seems that I can only pass Columns to UDFs. And I wonder if it will be very slow if I convert the DataFrame to RDD and apply the function on each row.

有人知道正确的解决方案吗?谢谢!

Does anyone know the correct solution? Thank you!

推荐答案

只需使用一点柯里化:

def convertDateFunc(resolution: DateResolutionType) = udf((x:String) => 
  SparkDateTimeConverter.convertDate(x, resolution))

并按如下方式使用它:

case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))

附带说明,您应该查看sql.functions.truncsql.functions.date_format.这些应该至少是工作的一部分,根本不使用 UDF.

On a side note you should take a look at sql.functions.trunc and sql.functions.date_format. These should at least part of the job without using UDFs at all.

注意:

在 Spark 2.2 或更高版本中,您可以使用 typedLit 函数:

In Spark 2.2 or later you can use typedLit function:

import org.apache.spark.sql.functions.typedLit

支持更广泛的文字,如 SeqMap.

which support a wider range of literals like Seq or Map.

这篇关于如何将额外的参数传递给 Spark SQL 中的 UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆