PySpark:检索数据框内组的平均值和数值的数量 [英] PySpark: retrieve mean and the count of values around the mean for groups within a dataframe

查看:458
本文介绍了PySpark:检索数据框内组的平均值和数值的数量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的原始数据以表格格式显示。它包含来自不同变量的观测值。


变量[字符串],时间[日期时间],值[变量名称],时间戳记和值。 float]


数据以HDFS中的Parquet存储并加载到Spark Dataframe(df)中。从这个数据框。



现在我想为每个变量计算像Mean,Standard Deviation和其他的默认统计量。之后,一旦Mean被检索出来,我想过滤/计算那些非常接近Mean的变量的值。

因此,我需要得到每个变量首先。这就是为什么我使用GroupBy来获取每个变量的统计信息(而不是整个数据集)。

  df_stats = df .groupBy(df.Variable).agg(\ 
count(df.Variable).alias(count),\
mean(df.Value).alias(mean), \
stddev(df.Value).alias(std_deviation))

然后,每个变量的平均值可以过滤那些围绕平均值的特定变量的值(只是计数)。因此我需要该变量的所有观察值(值)。这些值位于原始数据框 df 中,不在聚合/分组数据框中 df_stats



最后,我需要一个数据框,例如聚合/分组的 df_stats ,并添加一个新列count_around_mean



我正在考虑使用df_stats.map(...)或df_stats.join(df,df.Variable)。但是我被困在了红色的箭头上:(



问题:您会如何意识到这一点?

临时解决方案:同时,我正在使用基于您想法的解决方案,但stddev范围2和3的范围函数不起作用, / p>


AttributeError说NullType没有_jvm




<$ p从pyspark.sql.window导入窗口
从pyspark.sql.functions导入*
导入*

w1 = Window()。partitionBy(Variable)
w2 = Window.partitionBy(Variable)。orderBy(Time)

def stddev_pop_w(col,w):
$内置stddev不支持窗口
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w),2))

def isInRange(value,mean,stddev,radius):
try:
if(abs(value - mean)< radius * stddev):
return 1
else:
返回0
除了AttributeError:
返回-1

delta = col(Time)。cast(long) - lag(Time,1).over (w2).cast(long)
#f = udf(lambda(value,mean,stddev,radius):abs(value - mean)<半径* stddev,IntegerType())
f2 = udf(lambda值,mean,stddev:isInRange(value,mean,stddev,2),IntegerType())
f3 = udf ,stddev:isInRange(value,mean,stddev,3),IntegerType())

df \
.withColumn(mean,mean(Value)。 ))\
.withColumn(std_deviation,stddev_pop_w(col(Value),w1))\
.withColumn(delta,delta)
.withColumn( stddev_2,f2(Value,mean,std_deviation))\
.withColumn(stddev_3,f3(Value,mean,std_deviation))\
.show(5,False)

#df2.withColumn(std_dev_3,stddev_range(col(Value),w1))\


$ b

解决方案

$ b

您可以使用内置的 pyspark.sql.functions.stddev * 替换 stddev_pop_w 功能。

Spark< 2.0

一般情况下,不需要使用连接进行聚合。相反,您可以计算统计数据,而无需使用窗口函数折叠行。假设你的数据如下所示:

  import numpy as np 
从pyspark导入pandas作为pd
。 sql.functions import mean

n = 10000
k = 20

np.random.seed(100)

df = sqlContext.createDataFrame (pd.DataFrame({
id:np.arange(n),
variable):np.random.choice(k,n),
value:np。 random.normal(0,1,n)
)))

您可以定义窗口分区由变量

  from pyspark.sql.window import Window 
$ bw = Window()。partitionBy(variable)

并计算统计如下:

  from pyspark.sql.functions import avg,pow,sqrt 

def stddev_pop_w (col,w):
内置stddev不支持窗口
您可以轻松实现示例变体以及

return sqrt(avg(col * col).over(w) - pow(avg(col).over(w),2))


(df
.withColumn(stddev,stddev_pop_w(col(value),w))
.withColumn(mean,avg(value)。 (w))
.show(5,False))

## + --- + ------------------ - + -------- + ------------------ + ------------------- - +
## | id | value | variable | stddev | mean |
## + --- + -------------------- + -------- + --------- --------- + -------------------- +
## | 47 | 0.77212446947439 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## | 60 | -0.931463439483327 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## | 86 | 1.0199074337552294 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## | 121 | -1.619408643898953 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## | 145 | -0.16065930935765935 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## + --- + -------------------- + -------- + --------- --------- + -------------------- +
##只显示前5行


仅用于比较汇总:

  from pyspark.sql.functions import stddev,avg,broadcast 

df.join(
broadcast(df.groupBy(variable)。agg(avg(value), stddev(value))),
[variable]


My raw data comes in a tabular format. It contains observations from different variables. Each observation with the variable name, the timestamp and the value at that time.

Variable [string], Time [datetime], Value [float]

The data is stored as Parquet in HDFS and loaded into a Spark Dataframe (df). From that dataframe.

Now I want to calculate default statistics like Mean, Standard Deviation and others for each variable. Afterwards, once the Mean has been retrieved, I want to filter/count those values for that variable that are closely around the Mean.

Therefore I need to get the mean for each variable first. This is why I'm using GroupBy to get the statistics for each variable (not for the whole dataset).

df_stats = df.groupBy(df.Variable).agg( \
    count(df.Variable).alias("count"), \
    mean(df.Value).alias("mean"), \
    stddev(df.Value).alias("std_deviation"))

With the Mean for each variable I then can filter those values (just the count) for that specific variable that are around the Mean. Therefore I need all observations (values) for that variable. Those values are in the original dataframe df and not in the aggregated/grouped dataframe df_stats.

Finally I want one dataframe like the aggregated/grouped df_stats with a new column "count_around_mean".

I was thinking to use df_stats.map(...) or df_stats.join(df, df.Variable). But I'm stuck on the red arrows :(

Question: How would you realize that?

Temporary Solution: Meanwhile I'm using a solution that is based on your idea. But the range-functions for stddev range 2 and 3 does not work. It always yields an

AttributeError saying NullType has no _jvm

from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")

def stddev_pop_w(col, w):
    #Built-in stddev doesn't support windowing
    return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))

def isInRange(value, mean, stddev, radius):
    try:
        if (abs(value - mean) < radius * stddev):
            return 1
        else:
            return 0
    except AttributeError:
        return -1

delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())

df \
    .withColumn("mean", mean("Value").over(w1)) \
    .withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
    .withColumn("delta", delta)
    .withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
    .withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \
    .show(5, False)

#df2.withColumn("std_dev_3", stddev_range(col("Value"), w1)) \

解决方案

Spark 2.0+:

You can replace stddev_pop_w with one of the built-in pyspark.sql.functions.stddev* functions.

Spark < 2.0:

In general there is no need for aggregation with join. Instead you can compute statistics without collapsing the rows using window functions. Assuming your data looks like this:

import numpy as np
import pandas as pd
from pyspark.sql.functions import mean

n = 10000
k = 20

np.random.seed(100)

df = sqlContext.createDataFrame(pd.DataFrame({
    "id": np.arange(n),
    "variable": np.random.choice(k, n),
    "value": np.random.normal(0,  1, n)
}))

You can define window with partitioning by variable:

from pyspark.sql.window import Window

w = Window().partitionBy("variable")

and compute statistics as follows:

from pyspark.sql.functions import avg, pow, sqrt

def stddev_pop_w(col, w):
    """Builtin stddev doesn't support windowing
    You can easily implement sample variant as well
    """
    return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))


(df
    .withColumn("stddev", stddev_pop_w(col("value"), w))
    .withColumn("mean", avg("value").over(w))
    .show(5, False))

## +---+--------------------+--------+------------------+--------------------+
## |id |value               |variable|stddev            |mean                |
## +---+--------------------+--------+------------------+--------------------+
## |47 |0.77212446947439    |0       |1.0103781346123295|0.035316745261099715|
## |60 |-0.931463439483327  |0       |1.0103781346123295|0.035316745261099715|
## |86 |1.0199074337552294  |0       |1.0103781346123295|0.035316745261099715|
## |121|-1.619408643898953  |0       |1.0103781346123295|0.035316745261099715|
## |145|-0.16065930935765935|0       |1.0103781346123295|0.035316745261099715|
## +---+--------------------+--------+------------------+--------------------+
## only showing top 5 rows

Just for comparison aggregate with join:

from pyspark.sql.functions import stddev, avg, broadcast

df.join(
    broadcast(df.groupBy("variable").agg(avg("value"), stddev("value"))),
    ["variable"]
)

这篇关于PySpark:检索数据框内组的平均值和数值的数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆