PySpark:检索数据帧内组的平均值和平均值周围的值计数 [英] PySpark: retrieve mean and the count of values around the mean for groups within a dataframe
问题描述
我的原始数据采用表格格式.它包含来自不同变量的观察结果.每次观察都带有变量名、时间戳和当时的值.
My raw data comes in a tabular format. It contains observations from different variables. Each observation with the variable name, the timestamp and the value at that time.
变量[字符串]、时间[日期时间]、值[浮动]
Variable [string], Time [datetime], Value [float]
数据以 Parquet 形式存储在 HDFS 中并加载到 Spark Dataframe (df) 中.来自该数据帧.
The data is stored as Parquet in HDFS and loaded into a Spark Dataframe (df). From that dataframe.
现在我想计算每个变量的默认统计数据,如均值、标准差等.之后,一旦检索到均值,我想过滤/计算与均值非常接近的变量的那些值.
Now I want to calculate default statistics like Mean, Standard Deviation and others for each variable. Afterwards, once the Mean has been retrieved, I want to filter/count those values for that variable that are closely around the Mean.
因此我需要首先获得每个变量的平均值.这就是为什么我使用 GroupBy 来获取每个变量的统计数据(而不是整个数据集).
Therefore I need to get the mean for each variable first. This is why I'm using GroupBy to get the statistics for each variable (not for the whole dataset).
df_stats = df.groupBy(df.Variable).agg( \
count(df.Variable).alias("count"), \
mean(df.Value).alias("mean"), \
stddev(df.Value).alias("std_deviation"))
使用每个变量的平均值,然后我可以过滤平均值周围特定变量的那些值(仅计数).因此,我需要该变量的所有观察值(值).这些值在原始数据帧 df 中,而不在聚合/分组数据帧 df_stats 中.
With the Mean for each variable I then can filter those values (just the count) for that specific variable that are around the Mean. Therefore I need all observations (values) for that variable. Those values are in the original dataframe df and not in the aggregated/grouped dataframe df_stats.
最后,我想要一个数据框,比如聚合/分组的 df_stats 和一个新列 "count_around_mean".
Finally I want one dataframe like the aggregated/grouped df_stats with a new column "count_around_mean".
我想使用 df_stats.map(...) 或 df_stats.join(df, df.Variable).但我被困在红色箭头上:(
I was thinking to use df_stats.map(...) or df_stats.join(df, df.Variable). But I'm stuck on the red arrows :(
问题:你是如何意识到这一点的?
Question: How would you realize that?
临时解决方案:与此同时,我正在使用基于您的想法的解决方案.但是 stddev 范围 2 和 3 的范围函数不起作用.它总是产生一个
Temporary Solution: Meanwhile I'm using a solution that is based on your idea. But the range-functions for stddev range 2 and 3 does not work. It always yields an
属性错误说 NullType 没有 _jvm
AttributeError saying NullType has no _jvm
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")
def stddev_pop_w(col, w):
#Built-in stddev doesn't support windowing
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
def isInRange(value, mean, stddev, radius):
try:
if (abs(value - mean) < radius * stddev):
return 1
else:
return 0
except AttributeError:
return -1
delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())
df \
.withColumn("mean", mean("Value").over(w1)) \
.withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
.withColumn("delta", delta)
.withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
.withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \
.show(5, False)
#df2.withColumn("std_dev_3", stddev_range(col("Value"), w1)) \
推荐答案
Spark 2.0+:
您可以将 stddev_pop_w
替换为内置的 pyspark.sql.functions.stddev*
函数之一.
You can replace stddev_pop_w
with one of the built-in pyspark.sql.functions.stddev*
functions.
火花<2.0:
一般情况下不需要使用 join 进行聚合.相反,您可以使用窗口函数在不折叠行的情况下计算统计信息.假设您的数据如下所示:
In general there is no need for aggregation with join. Instead you can compute statistics without collapsing the rows using window functions. Assuming your data looks like this:
import numpy as np
import pandas as pd
from pyspark.sql.functions import mean
n = 10000
k = 20
np.random.seed(100)
df = sqlContext.createDataFrame(pd.DataFrame({
"id": np.arange(n),
"variable": np.random.choice(k, n),
"value": np.random.normal(0, 1, n)
}))
您可以通过变量
来定义带分区的窗口:
You can define window with partitioning by variable
:
from pyspark.sql.window import Window
w = Window().partitionBy("variable")
并按如下方式计算统计数据:
and compute statistics as follows:
from pyspark.sql.functions import avg, pow, sqrt
def stddev_pop_w(col, w):
"""Builtin stddev doesn't support windowing
You can easily implement sample variant as well
"""
return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))
(df
.withColumn("stddev", stddev_pop_w(col("value"), w))
.withColumn("mean", avg("value").over(w))
.show(5, False))
## +---+--------------------+--------+------------------+--------------------+
## |id |value |variable|stddev |mean |
## +---+--------------------+--------+------------------+--------------------+
## |47 |0.77212446947439 |0 |1.0103781346123295|0.035316745261099715|
## |60 |-0.931463439483327 |0 |1.0103781346123295|0.035316745261099715|
## |86 |1.0199074337552294 |0 |1.0103781346123295|0.035316745261099715|
## |121|-1.619408643898953 |0 |1.0103781346123295|0.035316745261099715|
## |145|-0.16065930935765935|0 |1.0103781346123295|0.035316745261099715|
## +---+--------------------+--------+------------------+--------------------+
## only showing top 5 rows
仅用于与 join 比较聚合:
Just for comparison aggregate with join:
from pyspark.sql.functions import stddev, avg, broadcast
df.join(
broadcast(df.groupBy("variable").agg(avg("value"), stddev("value"))),
["variable"]
)
这篇关于PySpark:检索数据帧内组的平均值和平均值周围的值计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!