如何按组使用 approxQuantile? [英] How to use approxQuantile by group?
问题描述
Spark 有 SQL 函数 percentile_approx()
,它的 Scala 对应函数是 df.stat.approxQuantile()
.
Spark has SQL function percentile_approx()
, and its Scala counterpart is df.stat.approxQuantile()
.
但是,Scala 对应项不能用于分组数据集,例如 df.groupby("foo").stat.approxQuantile()
,如此处所回答:https://stackoverflow.com/a/51933027.
However, the Scala counterpart cannot be used on grouped datasets, something like df.groupby("foo").stat.approxQuantile()
, as answered here: https://stackoverflow.com/a/51933027.
但是可以在 SQL 语法中同时进行分组和百分位数.所以我想知道,也许我可以从 SQL percentile_approx
函数定义一个 UDF 并在我的分组数据集上使用它?
But it's possible to do both grouping and percentiles in SQL syntax. So I'm wondering, maybe I can define an UDF from SQL percentile_approx
function and use it on my grouped dataset?
推荐答案
Spark >= 3.1
Spark 3.1 中添加了相应的 SQL 函数 - 参见 SPARK-30569.
Corresponding SQL functions have been added in Spark 3.1 - see SPARK-30569.
火花<3.1
虽然您不能在 UDF 中使用 approxQuantile
,并且您没有 percentile_approx
的 Scala 包装器,但自己实现一个并不难:
While you cannot use approxQuantile
in an UDF, and you there is no Scala wrapper for percentile_approx
it is not hard to implement one yourself:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
object PercentileApprox {
def percentile_approx(col: Column, percentage: Column, accuracy: Column): Column = {
val expr = new ApproximatePercentile(
col.expr, percentage.expr, accuracy.expr
).toAggregateExpression
new Column(expr)
}
def percentile_approx(col: Column, percentage: Column): Column = percentile_approx(
col, percentage, lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
)
}
示例用法:
import PercentileApprox._
val df = (Seq.fill(100)("a") ++ Seq.fill(100)("b")).toDF("group").withColumn(
"value", when($"group" === "a", randn(1) + 10).otherwise(randn(3))
)
df.groupBy($"group").agg(percentile_approx($"value", lit(0.5))).show
+-----+------------------------------------+
|group|percentile_approx(value, 0.5, 10000)|
+-----+------------------------------------+
| b| -0.06336346702250675|
| a| 9.818985618591595|
+-----+------------------------------------+
df.groupBy($"group").agg(
percentile_approx($"value", typedLit(Seq(0.1, 0.25, 0.75, 0.9)))
).show(false)
+-----+----------------------------------------------------------------------------------+
|group|percentile_approx(value, [0.1,0.25,0.75,0.9], 10000) |
+-----+----------------------------------------------------------------------------------+
|b |[-1.2098351202406483, -0.6640768986666159, 0.6778253126144265, 1.3255676906697658]|
|a |[8.902067202468098, 9.290417382259626, 10.41767257153993, 11.067087075488068] |
+-----+----------------------------------------------------------------------------------+
一旦它位于 JVM 类路径上,您还可以使用类似于内置函数的逻辑添加 PySpark 包装器.
Once this is on the JVM classpath you can also add PySpark wrapper, using logic similar to built-in functions.
这篇关于如何按组使用 approxQuantile?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!