我如何定义和星火SQL中使用用户定义的聚合函数? [英] How can I define and use a User-Defined Aggregate Function in Spark SQL?
问题描述
我知道如何写SQL星火一个UDF:
I know how to write a UDF in Spark SQL:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
我可以做类似的定义聚合函数的东西吗?如何做到这一点?
Can I do something similar to define an aggregate function? How is this done?
有关的背景下,我想运行下面的SQL查询:
For context, I want to run the following SQL query:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
它应该返回类似
行(span1,假的,T0)
我要聚合函数告诉我通过跨度定义的群体,如果有任何值
和 opticalReceivePower
时间戳
这是低于阈值。我需要不同的写我UDAF的UDF我上面贴?
I want the aggregate function to tell me if there's any values for opticalReceivePower
in the groups defined by span
and timestamp
which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?
推荐答案
星火&LT; = 1.4
据我所知,在这一刻(星火1.4.1),对于UDAF的支持,比蜂房之外的其他。它应该是可能的星火1.5(请参见 SPARK-3947 )。
As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).
星火1.4解决方法)
我不知道如果我没有理解你的要求,但据我可以告诉老式聚集应该是足够的位置:
I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:
val df = sc.parallelize(Seq(
("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")
df
.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
.groupBy($"group")
.agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
.show
// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+
星火> = 1.5
在星火1.5你可以这样创建UDAF虽然它是最有可能矫枉过正:
In Spark 1.5 you can create UDAF like this although it is most likely an overkill:
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
object belowThreshold extends UserDefinedAggregateFunction {
// Schema you get as an input
def inputSchema = new StructType().add("power", IntegerType)
// Schema of the row which is used for aggregation
def bufferSchema = new StructType().add("ind", BooleanType)
// Returned type
def dataType = BooleanType
// Self-explaining
def deterministic = true
// zero value
def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
// Similar to seqOp in aggregate
def update(buffer: MutableAggregationBuffer, input: Row) = {
if (!input.isNullAt(0))
buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
}
// Similar to combOp in aggregate
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))
}
// Called on exit to get return value
def evaluate(buffer: Row) = buffer.getBoolean(0)
}
实例:
df
.groupBy($"group")
.agg(belowThreshold($"power").alias("belowThreshold"))
.show
// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// | a| false|
// | b| true|
// +-----+--------------+
这篇关于我如何定义和星火SQL中使用用户定义的聚合函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!