在Spark SQL中为多个double数据类型列查找中位数 [英] Find median in spark SQL for multiple double datatype columns

查看:279
本文介绍了在Spark SQL中为多个double数据类型列查找中位数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要查找多个double数据类型列的中位数.请提出建议以找到正确的方法.

I have a requirement to find median for multiple double datatype columns.Request suggestion to find the correct approach.

下面是我的样本数据集,只有一列.我希望样本的中值会返回1.

Below is my sample dataset with one column. I am expecting the median value to be returned as 1 for my sample.

  scala> sqlContext.sql("select num from test").show();
+---+
|num|
+---+
|0.0|
|0.0|
|1.0|
|1.0|
|1.0|
|1.0|
+---+

我尝试了以下选项

1)Hive UDAF百分位数,仅适用于BigInt.

1) Hive UDAF percentile, it worked only for BigInt.

2)Hive UDAT percentile_approx,但未按预期运行(返回0.25 vs 1).

2) Hive UDAT percentile_approx, but it does not work as expected (returns 0.25 vs 1).

sqlContext.sql(从测试中选择percentile_approx(num,0.5)").show();

sqlContext.sql("select percentile_approx(num,0.5) from test").show();

+----+
| _c0|
+----+
|0.25|
+----+

3)Spark窗口函数percent_rank-以我所看到的方式找到中位数,即查找大于0.5的所有percent_rank并选择最大percent_rank的相应num值.但这并不是在所有情况下都有效,特别是当我有记录计数时,在这种情况下,中位数是排序分布中的中间值的平均值.

3) Spark window function percent_rank- to find median the way i see is to look for all percent_rank above 0.5 and pick the max percent_rank's corresponding num value. But it does not work in all cases, especially when i have even record counts, in such case the median is the average of the middle value in the sorted distribution.

也要在percent_rank中,因为我必须找到多列的中位数,所以我必须在不同的数据帧中计算它,这对我来说并不复杂.如果我的理解不正确,请纠正我.

Also in the percent_rank, as i have to find the median for multiple columns, i have to calculate it in different dataframes, which to me is little complex method. Please correct me, if my understanding is not right.

+---+-------------+
|num|percent_rank |
+---+-------------+
|0.0|0.0|
|0.0|0.0|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
+---+---+

推荐答案

出于好奇心,您正在使用哪个版本的Apache Spark? Apache Spark 2.0+中有一些修复程序,其中包括对approxQuantile的更改.

Which version of Apache Spark are you using out of curiosity? There were some fixes within the Apache Spark 2.0+ which included changes to approxQuantile.

如果我要运行下面的pySpark代码段:

If I was to run the pySpark code snippet below:

rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")

使用approxQuantile作为median的计算方式:

df.approxQuantile("num", [0.5], 0.25)

spark.sql("select percentile_approx(num, 0.5) from df").show()

结果是:

  • 火花2.0.0 :0.25
  • 火花2.0.1 :1.0
  • 火花2.1.0 :1.0
  • Spark 2.0.0: 0.25
  • Spark 2.0.1: 1.0
  • Spark 2.1.0: 1.0

请注意,因为这些是近似数字(通过approxQuantile),尽管通常应该可以正常工作.如果需要精确的中位数,一种方法是使用numpy.median.根据gench对如何使用Python Dataframe API在Apache Spark中查找中间值,针对本df示例,下面的代码段已更新?:

Note, as these are the approximate numbers (via approxQuantile) though in general this should work well. If you need the exact median, one approach is to use numpy.median. The code snippet below is updated for this df example based on gench's SO response to How to find the median in Apache Spark with Python Dataframe API?:

from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np

def find_median(values):
    try:
        median = np.median(values) #get the median of values in a list in each row
        return round(float(median),2)
    except Exception:
        return None #if there is anything wrong with the given values

median_finder = F.udf(find_median,FloatType())

df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))

# print out
df2.show()

输出为:

+---+--------------------+------+
| id|                nums|median|
+---+--------------------+------+
|  1|[0.0, 0.0, 1.0, 1...|   1.0|
+---+--------------------+------+

更新:使用RDD的Spark 1.6 Scala版本

如果您使用的是Spark 1.6,则可以通过Eugene Zhulenev的响应使用Scala代码计算median 如何计算准确的Apache Spark 中位数.下面是与我们的示例配合使用的修改后的代码.

Updated: Spark 1.6 Scala version using RDDs

If you are using Spark 1.6, you can calculate the median using Scala code via Eugene Zhulenev's response How can I calculate the exact median with Apache Spark. Below is the modified code that works with our example.

import org.apache.spark.SparkContext._

  val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))

  val sorted = rdd.sortBy(identity).zipWithIndex().map {
    case (v, idx) => (idx, v)
  }

  val count = sorted.count()

  val median: Double = if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
  } else sorted.lookup(count / 2).head.toDouble

输出为:

// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0

请注意,这是使用RDDs计算确切的中位数-也就是说,您需要将DataFrame列转换为RDD才能执行此计算.

Note, this is calculating the exact median using RDDs - i.e. you will need to convert the DataFrame column into an RDD to perform this calculation.

这篇关于在Spark SQL中为多个double数据类型列查找中位数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆