PySpark:检索数据框内组的平均值和数值的数量 [英] PySpark: retrieve mean and the count of values around the mean for groups within a dataframe
问题描述
变量[字符串],时间[日期时间],值[变量名称],时间戳记和值。 float]
数据以HDFS中的Parquet存储并加载到Spark Dataframe(df)中。从这个数据框。
现在我想为每个变量计算像Mean,Standard Deviation和其他的默认统计量。之后,一旦Mean被检索出来,我想过滤/计算那些非常接近Mean的变量的值。
因此,我需要得到每个变量首先。这就是为什么我使用GroupBy来获取每个变量的统计信息(而不是整个数据集)。
df_stats = df .groupBy(df.Variable).agg(\
count(df.Variable).alias(count),\
mean(df.Value).alias(mean), \
stddev(df.Value).alias(std_deviation))
然后,每个变量的平均值可以过滤那些围绕平均值的特定变量的值(只是计数)。因此我需要该变量的所有观察值(值)。这些值位于原始数据框 df 中,不在聚合/分组数据框中 df_stats 。
最后,我需要一个数据框,例如聚合/分组的 df_stats ,并添加一个新列count_around_mean 。
我正在考虑使用df_stats.map(...)或df_stats.join(df,df.Variable)。但是我被困在了红色的箭头上:(
) 问题:您会如何意识到这一点?
临时解决方案:同时,我正在使用基于您想法的解决方案,但stddev范围2和3的范围函数不起作用, / p>
AttributeError说NullType没有_jvm
<$ p从pyspark.sql.window导入窗口
从pyspark.sql.functions导入*
导入*
w1 = Window()。partitionBy(Variable)
w2 = Window.partitionBy(Variable)。orderBy(Time)
def stddev_pop_w(col,w):
$内置stddev不支持窗口
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w),2))
def isInRange(value,mean,stddev,radius):
try:
if(abs(value - mean)< radius * stddev):
return 1
else:
返回0
除了AttributeError:
返回-1
delta = col(Time)。cast(long) - lag(Time,1).over (w2).cast(long)
#f = udf(lambda(value,mean,stddev,radius):abs(value - mean)<半径* stddev,IntegerType())
f2 = udf(lambda值,mean,stddev:isInRange(value,mean,stddev,2),IntegerType())
f3 = udf ,stddev:isInRange(value,mean,stddev,3),IntegerType())
df \
.withColumn(mean,mean(Value)。 ))\
.withColumn(std_deviation,stddev_pop_w(col(Value),w1))\
.withColumn(delta,delta)
.withColumn( stddev_2,f2(Value,mean,std_deviation))\
.withColumn(stddev_3,f3(Value,mean,std_deviation))\
.show(5,False)
#df2.withColumn(std_dev_3,stddev_range(col(Value),w1))\
$ b
$ b 您可以使用内置的 Spark< 2.0 : 一般情况下,不需要使用连接进行聚合。相反,您可以计算统计数据,而无需使用窗口函数折叠行。假设你的数据如下所示: 您可以定义窗口分区由 并计算统计如下: 仅用于比较汇总: My raw data comes in a tabular format. It contains observations from different variables. Each observation with the variable name, the timestamp and the value at that time. Variable [string], Time [datetime], Value [float] The data is stored as Parquet in HDFS and loaded into a Spark Dataframe (df). From that dataframe. Now I want to calculate default statistics like Mean, Standard Deviation and others for each variable. Afterwards, once the Mean has been retrieved, I want to filter/count those values for that variable that are closely around the Mean. Therefore I need to get the mean for each variable first. This is why I'm using GroupBy to get the statistics for each variable (not for the whole dataset). With the Mean for each variable I then can filter those values (just the count) for that specific variable that are around the Mean. Therefore I need all observations (values) for that variable. Those values are in the original dataframe df and not in the aggregated/grouped dataframe df_stats. Finally I want one dataframe like the aggregated/grouped df_stats with a new column "count_around_mean". I was thinking to use df_stats.map(...) or df_stats.join(df, df.Variable). But I'm stuck on the red arrows :( Question: How would you realize that? Temporary Solution: Meanwhile I'm using a solution that is based on your idea. But the range-functions for stddev range 2 and 3 does not work. It always yields an AttributeError saying NullType has no _jvm
Spark 2.0+: You can replace Spark < 2.0: In general there is no need for aggregation with join. Instead you can compute statistics without collapsing the rows using window functions. Assuming your data looks like this: You can define window with partitioning by and compute statistics as follows: Just for comparison aggregate with join:
这篇关于PySpark:检索数据框内组的平均值和数值的数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋! pyspark.sql.functions.stddev *
替换 stddev_pop_w
功能。
import numpy as np
从pyspark导入pandas作为pd
。 sql.functions import mean
n = 10000
k = 20
np.random.seed(100)
df = sqlContext.createDataFrame (pd.DataFrame({
id:np.arange(n),
variable):np.random.choice(k,n),
value:np。 random.normal(0,1,n)
)))
变量
:
from pyspark.sql.window import Window
$ bw = Window()。partitionBy(variable)
from pyspark.sql.functions import avg,pow,sqrt
def stddev_pop_w (col,w):
内置stddev不支持窗口
您可以轻松实现示例变体以及
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w),2))
(df
.withColumn(stddev,stddev_pop_w(col(value),w))
.withColumn(mean,avg(value)。 (w))
.show(5,False))
## + --- + ------------------ - + -------- + ------------------ + ------------------- - +
## | id | value | variable | stddev | mean |
## + --- + -------------------- + -------- + --------- --------- + -------------------- +
## | 47 | 0.77212446947439 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## | 60 | -0.931463439483327 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## | 86 | 1.0199074337552294 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## | 121 | -1.619408643898953 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## | 145 | -0.16065930935765935 | 0 | 1.0103781346123295 | 0.035316745261099715 |
## + --- + -------------------- + -------- + --------- --------- + -------------------- +
##只显示前5行
from pyspark.sql.functions import stddev,avg,broadcast
df.join(
broadcast(df.groupBy(variable)。agg(avg(value), stddev(value))),
[variable]
)
df_stats = df.groupBy(df.Variable).agg( \
count(df.Variable).alias("count"), \
mean(df.Value).alias("mean"), \
stddev(df.Value).alias("std_deviation"))
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")
def stddev_pop_w(col, w):
#Built-in stddev doesn't support windowing
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
def isInRange(value, mean, stddev, radius):
try:
if (abs(value - mean) < radius * stddev):
return 1
else:
return 0
except AttributeError:
return -1
delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())
df \
.withColumn("mean", mean("Value").over(w1)) \
.withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
.withColumn("delta", delta)
.withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
.withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \
.show(5, False)
#df2.withColumn("std_dev_3", stddev_range(col("Value"), w1)) \
stddev_pop_w
with one of the built-in pyspark.sql.functions.stddev*
functions.import numpy as np
import pandas as pd
from pyspark.sql.functions import mean
n = 10000
k = 20
np.random.seed(100)
df = sqlContext.createDataFrame(pd.DataFrame({
"id": np.arange(n),
"variable": np.random.choice(k, n),
"value": np.random.normal(0, 1, n)
}))
variable
:from pyspark.sql.window import Window
w = Window().partitionBy("variable")
from pyspark.sql.functions import avg, pow, sqrt
def stddev_pop_w(col, w):
"""Builtin stddev doesn't support windowing
You can easily implement sample variant as well
"""
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
(df
.withColumn("stddev", stddev_pop_w(col("value"), w))
.withColumn("mean", avg("value").over(w))
.show(5, False))
## +---+--------------------+--------+------------------+--------------------+
## |id |value |variable|stddev |mean |
## +---+--------------------+--------+------------------+--------------------+
## |47 |0.77212446947439 |0 |1.0103781346123295|0.035316745261099715|
## |60 |-0.931463439483327 |0 |1.0103781346123295|0.035316745261099715|
## |86 |1.0199074337552294 |0 |1.0103781346123295|0.035316745261099715|
## |121|-1.619408643898953 |0 |1.0103781346123295|0.035316745261099715|
## |145|-0.16065930935765935|0 |1.0103781346123295|0.035316745261099715|
## +---+--------------------+--------+------------------+--------------------+
## only showing top 5 rows
from pyspark.sql.functions import stddev, avg, broadcast
df.join(
broadcast(df.groupBy("variable").agg(avg("value"), stddev("value"))),
["variable"]
)