如何在Spark SQL中将额外的参数传递给UDF? [英] How can I pass extra parameters to UDFs in Spark SQL?
问题描述
我想解析DataFrame
中的日期列,对于每个日期列,日期的分辨率可能会发生变化(即2011/01/10 => 2011/01,如果分辨率设置为月" ).
I want to parse the date columns in a DataFrame
, and for each date column, the resolution for the date may change (i.e. 2011/01/10 => 2011 /01 if the resolution is set to "Month").
我编写了以下代码:
def convertDataFrame(dataframe: DataFrame, schema : Array[FieldDataType], resolution: Array[DateResolutionType]) : DataFrame =
{
import org.apache.spark.sql.functions._
val convertDateFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDate(x, resolution)}
val convertDateTimeFunc = udf{(x:String, resolution: DateResolutionType) => SparkDateTimeConverter.convertDateTime(x, resolution)}
val allColNames = dataframe.columns
val allCols = allColNames.map(name => dataframe.col(name))
val mappedCols =
{
for(i <- allCols.indices) yield
{
schema(i) match
{
case FieldDataType.Date => convertDateFunc(allCols(i), resolution(i)))
case FieldDataType.DateTime => convertDateTimeFunc(allCols(i), resolution(i))
case _ => allCols(i)
}
}
}
dataframe.select(mappedCols:_*)
}}
但是它不起作用.看来我只能将Column
传递给UDF.而且我想知道如果将DataFrame
转换为RDD
并在每行上应用该函数是否会很慢.
However it doesn't work. It seems that I can only pass Column
s to UDFs. And I wonder if it will be very slow if I convert the DataFrame
to RDD
and apply the function on each row.
有人知道正确的解决方案吗?谢谢!
Does anyone know the correct solution? Thank you!
推荐答案
只需要一点点花招:
def convertDateFunc(resolution: DateResolutionType) = udf((x:String) =>
SparkDateTimeConverter.convertDate(x, resolution))
并按以下方式使用它:
case FieldDataType.Date => convertDateFunc(resolution(i))(allCols(i))
在旁注中,您应该看一下sql.functions.trunc
和sql.functions.date_format
.这些应该至少是工作的一部分,而根本不使用UDF.
On a side note you should take a look at sql.functions.trunc
and sql.functions.date_format
. These should at least part of the job without using UDFs at all.
注意:
在Spark 2.2或更高版本中,您可以使用typedLit
函数:
In Spark 2.2 or later you can use typedLit
function:
import org.apache.spark.sql.functions.typedLit
支持范围更广的文字,例如Seq
或Map
.
which support a wider range of literals like Seq
or Map
.
这篇关于如何在Spark SQL中将额外的参数传递给UDF?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!