如何在Spark中找到分组数据的确切中位数 [英] How to find exact median for grouped data in Spark
问题描述
我需要使用Scala在Spark中Double数据类型的分组数据集上计算准确的中位数.
I have a requirement to calculate exact median on grouped data set of Double datatype in Spark using Scala.
它与类似的查询不同:在spark SQL中找到多个双数据类型列的中位数.这个问题是关于为分组数据查找数据,而另一个问题是关于RDD级别的中位数.
It is different from the similar query: Find median in spark SQL for multiple double datatype columns. This question is about the finding data for grouped data, whereas the other one is about finding median on a RDD level.
这是我的样本数据
scala> sqlContext.sql("select * from test").show()
+---+---+
| id|num|
+---+---+
| A|0.0|
| A|1.0|
| A|1.0|
| A|1.0|
| A|0.0|
| A|1.0|
| B|0.0|
| B|1.0|
| B|1.0|
+---+---+
预期答案:
+--------+
| Median |
+--------+
| 1 |
| 1 |
+--------+
我尝试了以下选项,但没有运气:
I tried the following option, but no luck:
1)Hive函数百分位,仅适用于BigInt.
1) Hive function percentile, it worked only for BigInt.
2)配置单元功能percentile_approx
,但无法正常工作(返回0.25 vs 1).
2) Hive function percentile_approx
, but it does not work as expected (returns 0.25 vs 1).
scala> sqlContext.sql("select percentile_approx(num, 0.5) from test group by id").show()
+----+
| _c0|
+----+
|0.25|
|0.25|
+----+
推荐答案
最简单的方法(需要Spark 2.0.1+,而不是确切的中位数)
Simplest Approach (requires Spark 2.0.1+ and not exact median)
As noted in the comments in reference to the first question Find median in Spark SQL for double datatype columns, we can use percentile_approx
to calculate median for Spark 2.0.1+. To apply this for grouped data in Apache Spark, the query would look like:
val df = Seq(("A", 0.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)).toDF("id", "num")
df.createOrReplaceTempView("df")
spark.sql("select id, percentile_approx(num, 0.5) as median from df group by id order by id").show()
,输出为:
+---+------+
| id|median|
+---+------+
| A| 1.0|
| B| 1.0|
+---+------+
要说的是,这是一个近似值(而不是每个问题的确切中位数).
Saying this, this is an approximate value (as opposed to an exact median per the question).
有多种方法,因此我确信SO中的其他人可以提供更好或更有效的示例.但这是一个代码片段,用于计算Spark中的分组数据的中位数(已在Spark 1.6和Spark 2.1中验证):
There are multiple approaches so I'm sure others in SO can provide better or more efficient examples. But here's a code snippet calculate the median for grouped data in Spark (verified in Spark 1.6 and Spark 2.1):
import org.apache.spark.SparkContext._
val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("A", 1.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 0.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)))
// Scala median function
def median(inputList: List[Double]): Double = {
val count = inputList.size
if (count % 2 == 0) {
val l = count / 2 - 1
val r = l + 1
(inputList(l) + inputList(r)).toDouble / 2
} else
inputList(count / 2).toDouble
}
// Sort the values
val setRDD = rdd.groupByKey()
val sortedListRDD = setRDD.mapValues(_.toList.sorted)
// Output DataFrame of id and median
sortedListRDD.map(m => {
(m._1, median(m._2))
}).toDF("id", "median_of_num").show()
,输出为:
+---+-------------+
| id|median_of_num|
+---+-------------+
| A| 1.0|
| B| 1.0|
+---+-------------+
我应该指出一些警告,因为这可能不是最有效的实现方式:
There are some caveats that I should call out as this likely isn't the most efficient implementation:
- It's currently using a
groupByKey
which is not very performant. You may want to change this into areduceByKey
instead (more information at Avoid GroupByKey) - Using a Scala function to calculate the
median
.
这种方法应该适用于少量数据,但是如果每个键都有数百万行,则建议使用Spark 2.0.1+和percentile_approx
方法.
This approach should work okay for smaller amounts of data but if you have millions of rows for each key, would advise utilizing Spark 2.0.1+ and using the percentile_approx
approach.
这篇关于如何在Spark中找到分组数据的确切中位数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!