如何计算给定 PySpark DataFrame 的均值和标准差? [英] How to calculate mean and standard deviation given a PySpark DataFrame?
问题描述
我有名为 df
的 PySpark DataFrame(不是 pandas),使用 collect()
非常大.因此,下面给出的代码效率不高.以前处理的数据量较少,但现在失败了.
I have PySpark DataFrame (not pandas) called df
that is quite large to use collect()
. Therefore the below-given code is not efficient. It was working with a smaller amount of data, however now it fails.
import numpy as np
myList = df.collect()
total = []
for product,nb in myList:
for p2,score in nb:
total.append(score)
mean = np.mean(total)
std = np.std(total)
有什么办法可以通过使用 pyspark.sql.functions
或类似的方法将 mean
和 std
作为两个变量?
Is there any way to get mean
and std
as two variables by using pyspark.sql.functions
or similar?
from pyspark.sql.functions import mean as mean_, std as std_
我可以使用 withColumn
,但是,这种方法逐行应用计算,并且它不返回单个变量.
I could use withColumn
, however, this approach applies the calculations row by row, and it does not return a single variable.
更新:
df
的示例内容:
+----------+------------------+
|product_PK| products|
+----------+------------------+
| 680|[[691,1], [692,5]]|
| 685|[[691,2], [692,2]]|
| 684|[[691,1], [692,3]]|
我应该计算 score
值的均值和标准差,例如[691,1]
中的值 1
是分数之一.
I should calculate mean and standard deviation of score
values, e.g. the value 1
in [691,1]
is one of scores.
推荐答案
您可以使用内置函数来获取聚合统计信息.以下是获取均值和标准差的方法.
You can use the built in functions to get aggregate statistics. Here's how to get mean and standard deviation.
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col
df_stats = df.select(
_mean(col('columnName')).alias('mean'),
_stddev(col('columnName')).alias('std')
).collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
请注意,存在三种不同的标准偏差函数.从我使用的文档 (stddev
) 返回以下内容:
Note that there are three different standard deviation functions. From the docs the one I used (stddev
) returns the following:
聚合函数:返回无偏样本标准差组中的表达
Aggregate function: returns the unbiased sample standard deviation of the expression in a group
您也可以使用 describe()
方法:
You could use the describe()
method as well:
df.describe().show()
有关更多信息,请参阅此链接:pyspark.sql.functions
Refer to this link for more info: pyspark.sql.functions
更新:这是处理嵌套数据的方式.
UPDATE: This is how you can work through the nested data.
使用 explode
将值提取到单独的行中,然后调用 mean
和 stddev
,如上所示.
Use explode
to extract the values into separate rows, then call mean
and stddev
as shown above.
这是一个 MWE:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev
# mock up sample dataframe
df = sqlCtx.createDataFrame(
[(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
["product_PK", "products"]
)
# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())
# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
.withColumn('score', get_score(col('exploded')))\
.select(
_mean(col('score')).alias('mean'),
_stddev(col('score')).alias('std')
)\
.collect()
mean = df_stats[0]['mean']
std = df_stats[0]['std']
print([mean, std])
输出:
[2.3333333333333335, 1.505545305418162]
您可以使用 numpy
验证这些值是否正确:
You can verify that these values are correct using numpy
:
vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])
说明:您的 "products"
列是 list
的 list
.调用 explode
将为外部 list
的每个元素创建一个新行.然后从每个分解的行中获取 "score"
值,您已将其定义为 2 元素 list
中的第二个元素.最后,在这个新列上调用聚合函数.
Explanation: Your "products"
column is a list
of list
s. Calling explode
will make a new row for each element of the outer list
. Then grab the "score"
value from each of the exploded rows, which you have defined as the second element in a 2-element list
. Finally, call the aggregate functions on this new column.
这篇关于如何计算给定 PySpark DataFrame 的均值和标准差?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!