在pyspark中计算偶数行的中值 [英] calculate median values with even number of rows in pyspark

查看:81
本文介绍了在pyspark中计算偶数行的中值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用从另一个 SO 帖子中获取的这个公式来计算 pyspark 中列的中值:

I am using this formula taken from another SO post to calculate the median values of columns in pyspark:

columns = ['id', 'dogs', 'cats']
vals = [(1, 2, 0),(2, 0, 1)]
df = sqlContext.createDataFrame(vals, columns)
df.approxQuantile(list(c for c in df.columns), [0.5], 0)

当 df 中的行数为奇数时,该公式有效,但如果行数为偶数,则不会取中心元素之间的平均值(当然我的意思是在排序后),而只取第一个.

The formula works when there are an odd number of rows in the df but if the number of rows is even, it is not taking the mean between the central elements (I mean after sorting of course), but only the first one.

上面代码的结果例如:

[[1.0], [0.0], [0.0]] 

本来应该是:

[[1.5], [1.0], [0.5]] 

如何正确计算pyspark中的中值?

How to correct calculate the median value in pyspark?

推荐答案

我按组计算了确切的中位数(没有使用 numpy).您可以通过删除 Windows 部分轻松调整该方法.

I computed the exact median by group (without using numpy). You can easily adapt the approch by removing the Windows part.

  • 如果首先按组为每个值(排序后)分配一个 row_number,并计算每个组的行数.
  • 对于具有成对行数的组,我保留了中间的两行
  • 对于奇数行的组,我保留中间行
  • 然后我计算了按组保留的行的平均值
import pyspark.sql.functions as F
from pyspark.sql.window import Window

columns = ['group1_col', 'group2_col', 'value_col']
vals = [['a', 'aa',  1],
       ['a', 'aa', 2], 
       ['b', 'aa', 2], 
       ['b', 'aa', 0], 
       ['c', 'aa', 0], 
       ['c', 'bb', 1],
       ['d', 'bb', 10],
       ['d', 'bb', 20],
       ['d', 'bb', 30],
       ]

df = spark.createDataFrame(vals, columns)

def compute_median(self, col, median_name, by_col=None):
  """ Method to be added to spark native DataFrame class """
  df_without_null = self.filter(F.col(col).isNotNull())
  
  window_spec = Window.partitionBy()
  if by_col is not None:
      window_spec = Window.partitionBy(by_col)
  window_spec_order = window_spec.orderBy(col)                                  

  df = (df_without_null
        .withColumn('row_number', F.row_number().over(window_spec_order))                     
        .withColumn('total_rows', F.count(F.lit(1)).over(window_spec))
        )

  row_to_keep = (
      df
      .filter((F.col('row_number') == F.ceil(F.col('total_rows') / 2))
                 | ((F.col('total_rows') % 2 == 0)
              & (F.col('row_number') == F.floor(F.col('total_rows') / 2) + 1)))
  )

  if by_col is None:
      return row_to_keep.select(F.mean(F.col(col)).alias(median_name))
  return row_to_keep.groupBy(by_col).agg(F.mean(F.col(col)).alias(median_name))

# Add method to DataFrame class
DataFrame.compute_median = compute_median

# med = df.compute_median("value_col", "global_median")
# med.collect()
# global_median
#      2.0

# med_group1 = df.compute_median("value_col", "median", 'group1_col')
# med_group1.collect()
# group1_col | median
#    a       | 1.5
#    b       | 1.0
#    c       | 0.5
#    d       | 20.0

# med_group2 = df.compute_median("value_col", "median", 'group2_col')
# med_group1.collect()
# group1_col | median
#    aa       | 1.5
#    bb       | 15.0

您可以检查我的过滤条件是否与此相同(更长但可能更容易理解)

You can check that my filter condition is equivalent to this one (longer but maybe more easy to understand)

res = (df_rank_and_number_or_row
       .filter(
                ((F.col('total_rows') % 2 == 0)
                    & ((F.col('row_number') == F.ceil(F.col('total_rows') / 2))
                         | (F.col('row_number') == F.floor(F.col('total_rows') / 2) + 1))
                ) |
                (F.col('total_rows') % 2 != 0)
                  & ((F.col('row_number') == F.ceil(F.col('total_rows') / 2))
                )
               )

这篇关于在pyspark中计算偶数行的中值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆