按组在Spark-Scala中查找百分位数 [英] Finding Percentile in Spark-Scala per a group

查看:373
本文介绍了按组在Spark-Scala中查找百分位数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Window函数在列上执行百分位数,如下所示.我已在此处引用,以便在组中使用ApproxQuantile定义./p>

I am trying to do a percentile over a column using a Window function as below. I have referred here to use the ApproxQuantile definition over a group.

val df1 = Seq(
    (1, 10.0), (1, 20.0), (1, 40.6), (1, 15.6), (1, 17.6), (1, 25.6),
    (1, 39.6), (2, 20.5), (2 ,70.3), (2, 69.4), (2, 74.4), (2, 45.4),
    (3, 60.6), (3, 80.6), (4, 30.6), (4, 90.6)
).toDF("ID","Count")

val idBucketMapping = Seq((1, 4), (2, 3), (3, 2), (4, 2))
    .toDF("ID", "Bucket")

//jpp
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile
import org.apache.spark.sql.expressions.Window

object PercentileApprox {
     def percentile_approx(col: Column, percentage: Column,
                             accuracy: Column): Column = {
         val expr = new ApproximatePercentile(
             col.expr, percentage.expr, accuracy.expr
         ).toAggregateExpression
         new Column(expr)
    }

    def percentile_approx(col: Column, percentage: Column): Column =
        percentile_approx(col, percentage,
                  lit(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY))
}

import PercentileApprox._

var res = df1
    .withColumn("percentile",
        percentile_approx(col("count"), typedLit(doBucketing(2)))
                 .over(Window.partitionBy("ID"))
    )

def doBucketing(bucket_size : Int) = (1 until bucket_size)
    .scanLeft(0d)((a, _) => a + (1 / bucket_size.toDouble))

scala> df1.show
+---+-----+
| ID|Count|
+---+-----+
|  1| 10.0|
|  1| 20.0|
|  1| 40.6|
|  1| 15.6|
|  1| 17.6|
|  1| 25.6|
|  1| 39.6|
|  2| 20.5|
|  2| 70.3|
|  2| 69.4|
|  2| 74.4|
|  2| 45.4|
|  3| 60.6|
|  3| 80.6|
|  4| 30.6|
|  4| 90.6|
+---+-----+


scala> idBucketMapping.show
+---+------+
| ID|Bucket|
+---+------+
|  1|     4|
|  2|     3|
|  3|     2|
|  4|     2|
+---+------+


scala> res.show
+---+-----+------------------+
| ID|Count|        percentile|
+---+-----+------------------+
|  1| 10.0|[10.0, 20.0, 40.6]|
|  1| 20.0|[10.0, 20.0, 40.6]|
|  1| 40.6|[10.0, 20.0, 40.6]|
|  1| 15.6|[10.0, 20.0, 40.6]|
|  1| 17.6|[10.0, 20.0, 40.6]|
|  1| 25.6|[10.0, 20.0, 40.6]|
|  1| 39.6|[10.0, 20.0, 40.6]|
|  3| 60.6|[60.6, 60.6, 80.6]|
|  3| 80.6|[60.6, 60.6, 80.6]|
|  4| 30.6|[30.6, 30.6, 90.6]|
|  4| 90.6|[30.6, 30.6, 90.6]|
|  2| 20.5|[20.5, 69.4, 74.4]|
|  2| 70.3|[20.5, 69.4, 74.4]|
|  2| 69.4|[20.5, 69.4, 74.4]|
|  2| 74.4|[20.5, 69.4, 74.4]|
|  2| 45.4|[20.5, 69.4, 74.4]|
+---+-----+------------------+

到目前为止,一切都很好,逻辑也很简单.但是我需要以动态的方式获得结果.这意味着此函数的参数doBucketing(2)应基于ID-值从idBucketMapping中获取.

Upto here it is well and good and the logic is simple. But I need results in a dynamic fashion. This means the argument doBucketing(2) to this function should be taken from idBucketMapping based on the ID - Value.

这对我来说似乎有点棘手.这有可能吗?

This seems to be little bit tricky for me. Is this possible by any means?

预期输出- 这意味着百分位存储桶基于-idBucketMapping数据框.

Expected Output -- This means the percentile bucket is based on - idBucketMapping Dataframe .

+---+-----+------------------------+
|ID |Count|percentile              |
+---+-----+------------------------+
|1  |10.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |20.0 |[10.0, 15.6, 20.0, 39.6]|
|1  |40.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |15.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |17.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |25.6 |[10.0, 15.6, 20.0, 39.6]|
|1  |39.6 |[10.0, 15.6, 20.0, 39.6]|
|3  |60.6 |[60.6, 60.6]            |
|3  |80.6 |[60.6, 60.6]            |
|4  |30.6 |[30.6, 30.6]            |
|4  |90.6 |[30.6, 30.6]            |
|2  |20.5 |[20.5, 45.4, 70.3]      |
|2  |70.3 |[20.5, 45.4, 70.3]      |
|2  |69.4 |[20.5, 45.4, 70.3]      |
|2  |74.4 |[20.5, 45.4, 70.3]      |
|2  |45.4 |[20.5, 45.4, 70.3]      |
+---+-----+------------------------+

推荐答案

我为您提供了一个非常简单的解决方案,并且仅在您的存储分区数量有限时才有效.

I have a solution for you that is extremely unelegant and works only if you have a limited number of possible bucketing.

我的第一个版本很丑.

// for the sake of clarity, let's define a function that generates the
// window aggregation
def per(x : Int) = percentile_approx(col("count"), typedLit(doBucketing(x)))
                        .over(Window.partitionBy("ID"))

// then, we simply try to match the Bucket column with a possible value
val res = df1
    .join(idBucketMapping, Seq("ID"))
    .withColumn("percentile", when('Bucket === 2, per(2)
                     .otherwise(when('Bucket === 3, per(3))
                     .otherwise(per(4)))
    )

这很讨厌,但可以解决您的问题. 稍显丑陋但逻辑完全相同的是,您可以定义一组可能的存储桶数量,并使用它来执行与上述相同的操作.

That's nasty but it works in your case. Slightly less ugly but very same logic, you can define a set of possible numbers of buckets and use it to do the same thing as above.

val possible_number_of_buckets = 2 to 5

val res = df1
    .join(idBucketMapping, Seq("ID"))
    .withColumn("percentile", possible_number_of_buckets
                .tail
                .foldLeft(per(possible_number_of_buckets.head))
                         ((column, size) => when('Bucket === size, per(size))
                                              .otherwise(column)))

这篇关于按组在Spark-Scala中查找百分位数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆