在 spark SQL 中查找多个双数据类型列的中位数 [英] Find median in spark SQL for multiple double datatype columns
问题描述
我需要找到多个双数据类型列的中位数.请求建议以找到正确的方法.
I have a requirement to find median for multiple double datatype columns.Request suggestion to find the correct approach.
下面是我的一列示例数据集.我希望我的样本的中值返回为 1.
Below is my sample dataset with one column. I am expecting the median value to be returned as 1 for my sample.
scala> sqlContext.sql("select num from test").show();
+---+
|num|
+---+
|0.0|
|0.0|
|1.0|
|1.0|
|1.0|
|1.0|
+---+
我尝试了以下选项
1) Hive UDAF 百分位,它只适用于 BigInt.
1) Hive UDAF percentile, it worked only for BigInt.
2) Hive UDAT percentile_approx,但它没有按预期工作(返回 0.25 vs 1).
2) Hive UDAT percentile_approx, but it does not work as expected (returns 0.25 vs 1).
sqlContext.sql("select percentile_approx(num,0.5) from test").show();
sqlContext.sql("select percentile_approx(num,0.5) from test").show();
+----+
| _c0|
+----+
|0.25|
+----+
3) Spark 窗口函数percent_rank- 找到中位数的方法是查找所有高于0.5 的percent_rank 并选择最大percent_rank 对应的num 值.但它并不适用于所有情况,尤其是当我什至有记录计数时,在这种情况下,中位数是排序分布中中间值的平均值.
3) Spark window function percent_rank- to find median the way i see is to look for all percent_rank above 0.5 and pick the max percent_rank's corresponding num value. But it does not work in all cases, especially when i have even record counts, in such case the median is the average of the middle value in the sorted distribution.
也在percent_rank 中,因为我必须找到多列的中位数,我必须在不同的数据帧中计算它,这对我来说是一种复杂的方法.如果我的理解不正确,请纠正我.
Also in the percent_rank, as i have to find the median for multiple columns, i have to calculate it in different dataframes, which to me is little complex method. Please correct me, if my understanding is not right.
+---+-------------+
|num|percent_rank |
+---+-------------+
|0.0|0.0|
|0.0|0.0|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
+---+---+
推荐答案
出于好奇,您正在使用哪个版本的 Apache Spark?Apache Spark 2.0+ 中有一些修复,其中包括对 approxQuantile
的更改.
Which version of Apache Spark are you using out of curiosity? There were some fixes within the Apache Spark 2.0+ which included changes to approxQuantile
.
如果我要运行下面的 pySpark 代码片段:
If I was to run the pySpark code snippet below:
rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")
使用 approxQuantile
计算 median
为:
df.approxQuantile("num", [0.5], 0.25)
或
spark.sql("select percentile_approx(num, 0.5) from df").show()
结果是:
- Spark 2.0.0:0.25
- Spark 2.0.1:1.0
- Spark 2.1.0:1.0
- Spark 2.0.0: 0.25
- Spark 2.0.1: 1.0
- Spark 2.1.0: 1.0
请注意,由于这些是近似数字(通过 approxQuantile
),但一般来说这应该可以很好地工作.如果您需要精确的中位数,一种方法是使用 numpy.median
.下面的代码片段根据 gench 对 How to find the medium in Apache 的 SO 响应为此 df
示例更新Spark 与 Python Dataframe API?:
Note, as these are the approximate numbers (via approxQuantile
) though in general this should work well. If you need the exact median, one approach is to use numpy.median
. The code snippet below is updated for this df
example based on gench's SO response to How to find the median in Apache Spark with Python Dataframe API?:
from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np
def find_median(values):
try:
median = np.median(values) #get the median of values in a list in each row
return round(float(median),2)
except Exception:
return None #if there is anything wrong with the given values
median_finder = F.udf(find_median,FloatType())
df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))
# print out
df2.show()
输出:
+---+--------------------+------+
| id| nums|median|
+---+--------------------+------+
| 1|[0.0, 0.0, 1.0, 1...| 1.0|
+---+--------------------+------+
更新:使用 RDD 的 Spark 1.6 Scala 版本
如果您使用的是 Spark 1.6,您可以通过 Eugene Zhulenev 的响应使用 Scala 代码计算median
How我可以用 Apache Spark 计算准确的中位数吗.以下是适用于我们示例的修改后的代码.
Updated: Spark 1.6 Scala version using RDDs
If you are using Spark 1.6, you can calculate the median
using Scala code via Eugene Zhulenev's response How can I calculate the exact median with Apache Spark. Below is the modified code that works with our example.
import org.apache.spark.SparkContext._
val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))
val sorted = rdd.sortBy(identity).zipWithIndex().map {
case (v, idx) => (idx, v)
}
val count = sorted.count()
val median: Double = if (count % 2 == 0) {
val l = count / 2 - 1
val r = l + 1
(sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
} else sorted.lookup(count / 2).head.toDouble
输出:
// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0
请注意,这是使用 RDDs
计算精确中位数 - 即您需要将 DataFrame 列转换为 RDD 才能执行此计算.
Note, this is calculating the exact median using RDDs
- i.e. you will need to convert the DataFrame column into an RDD to perform this calculation.
这篇关于在 spark SQL 中查找多个双数据类型列的中位数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!