在 spark SQL 中查找多个双数据类型列的中位数 [英] Find median in spark SQL for multiple double datatype columns

查看:30
本文介绍了在 spark SQL 中查找多个双数据类型列的中位数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要找到多个双数据类型列的中位数.请求建议以找到正确的方法.

I have a requirement to find median for multiple double datatype columns.Request suggestion to find the correct approach.

下面是我的一列示例数据集.我希望我的样本的中值返回为 1.

Below is my sample dataset with one column. I am expecting the median value to be returned as 1 for my sample.

  scala> sqlContext.sql("select num from test").show();
+---+
|num|
+---+
|0.0|
|0.0|
|1.0|
|1.0|
|1.0|
|1.0|
+---+

我尝试了以下选项

1) Hive UDAF 百分位,它只适用于 BigInt.

1) Hive UDAF percentile, it worked only for BigInt.

2) Hive UDAT percentile_approx,但它没有按预期工作(返回 0.25 vs 1).

2) Hive UDAT percentile_approx, but it does not work as expected (returns 0.25 vs 1).

sqlContext.sql("select percentile_approx(num,0.5) from test").show();

sqlContext.sql("select percentile_approx(num,0.5) from test").show();

+----+
| _c0|
+----+
|0.25|
+----+

3) Spark 窗口函数percent_rank- 找到中位数的方法是查找所有高于0.5 的percent_rank 并选择最大percent_rank 对应的num 值.但它并不适用于所有情况,尤其是当我什至有记录计数时,在这种情况下,中位数是排序分布中中间值的平均值.

3) Spark window function percent_rank- to find median the way i see is to look for all percent_rank above 0.5 and pick the max percent_rank's corresponding num value. But it does not work in all cases, especially when i have even record counts, in such case the median is the average of the middle value in the sorted distribution.

也在percent_rank 中,因为我必须找到多列的中位数,我必须在不同的数据帧中计算它,这对我来说是一种复杂的方法.如果我的理解不正确,请纠正我.

Also in the percent_rank, as i have to find the median for multiple columns, i have to calculate it in different dataframes, which to me is little complex method. Please correct me, if my understanding is not right.

+---+-------------+
|num|percent_rank |
+---+-------------+
|0.0|0.0|
|0.0|0.0|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
|1.0|0.4|
+---+---+

推荐答案

出于好奇,您正在使用哪个版本的 Apache Spark?Apache Spark 2.0+ 中有一些修复,其中包括对 approxQuantile 的更改.

Which version of Apache Spark are you using out of curiosity? There were some fixes within the Apache Spark 2.0+ which included changes to approxQuantile.

如果我要运行下面的 pySpark 代码片段:

If I was to run the pySpark code snippet below:

rdd = sc.parallelize([[1, 0.0], [1, 0.0], [1, 1.0], [1, 1.0], [1, 1.0], [1, 1.0]])
df = rdd.toDF(['id', 'num'])
df.createOrReplaceTempView("df")

使用 approxQuantile 计算 median 为:

df.approxQuantile("num", [0.5], 0.25)

spark.sql("select percentile_approx(num, 0.5) from df").show()

结果是:

  • Spark 2.0.0:0.25
  • Spark 2.0.1:1.0
  • Spark 2.1.0:1.0
  • Spark 2.0.0: 0.25
  • Spark 2.0.1: 1.0
  • Spark 2.1.0: 1.0

请注意,由于这些是近似数字(通过 approxQuantile),但一般来说这应该可以很好地工作.如果您需要精确的中位数,一种方法是使用 numpy.median.下面的代码片段根据 gench 对 How to find the medium in Apache 的 SO 响应为此 df 示例更新Spark 与 Python Dataframe API?:

Note, as these are the approximate numbers (via approxQuantile) though in general this should work well. If you need the exact median, one approach is to use numpy.median. The code snippet below is updated for this df example based on gench's SO response to How to find the median in Apache Spark with Python Dataframe API?:

from pyspark.sql.types import *
import pyspark.sql.functions as F
import numpy as np

def find_median(values):
    try:
        median = np.median(values) #get the median of values in a list in each row
        return round(float(median),2)
    except Exception:
        return None #if there is anything wrong with the given values

median_finder = F.udf(find_median,FloatType())

df2 = df.groupBy("id").agg(F.collect_list("num").alias("nums"))
df2 = df2.withColumn("median", median_finder("nums"))

# print out
df2.show()

输出:

+---+--------------------+------+
| id|                nums|median|
+---+--------------------+------+
|  1|[0.0, 0.0, 1.0, 1...|   1.0|
+---+--------------------+------+

更新:使用 RDD 的 Spark 1.6 Scala 版本

如果您使用的是 Spark 1.6,您可以通过 Eugene Zhulenev 的响应使用 Scala 代码计算medianHow我可以用 Apache Spark 计算准确的中位数吗.以下是适用于我们示例的修改后的代码.

Updated: Spark 1.6 Scala version using RDDs

If you are using Spark 1.6, you can calculate the median using Scala code via Eugene Zhulenev's response How can I calculate the exact median with Apache Spark. Below is the modified code that works with our example.

import org.apache.spark.SparkContext._

  val rdd: RDD[Double] = sc.parallelize(Seq((0.0), (0.0), (1.0), (1.0), (1.0), (1.0)))

  val sorted = rdd.sortBy(identity).zipWithIndex().map {
    case (v, idx) => (idx, v)
  }

  val count = sorted.count()

  val median: Double = if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (sorted.lookup(l).head + sorted.lookup(r).head).toDouble / 2
  } else sorted.lookup(count / 2).head.toDouble

输出:

// output
import org.apache.spark.SparkContext._
rdd: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[227] at parallelize at <console>:34
sorted: org.apache.spark.rdd.RDD[(Long, Double)] = MapPartitionsRDD[234] at map at <console>:36
count: Long = 6
median: Double = 1.0

请注意,这是使用 RDDs 计算精确中位数 - 即您需要将 DataFrame 列转换为 RDD 才能执行此计算.

Note, this is calculating the exact median using RDDs - i.e. you will need to convert the DataFrame column into an RDD to perform this calculation.

这篇关于在 spark SQL 中查找多个双数据类型列的中位数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆