如何计算给定 PySpark DataFrame 的均值和标准差? [英] How to calculate mean and standard deviation given a PySpark DataFrame?

查看:87
本文介绍了如何计算给定 PySpark DataFrame 的均值和标准差?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有名为 df 的 PySpark DataFrame(不是 pandas),使用 collect() 非常大.因此,下面给出的代码效率不高.以前处理的数据量较少,但现在失败了.

I have PySpark DataFrame (not pandas) called df that is quite large to use collect(). Therefore the below-given code is not efficient. It was working with a smaller amount of data, however now it fails.

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)

有什么办法可以通过使用 pyspark.sql.functions 或类似的方法将 meanstd 作为两个变量?

Is there any way to get mean and std as two variables by using pyspark.sql.functions or similar?

from pyspark.sql.functions import mean as mean_, std as std_

我可以使用 withColumn,但是,这种方法逐行应用计算,并且它不返回单个变量.

I could use withColumn, however, this approach applies the calculations row by row, and it does not return a single variable.

更新:

df的示例内容:

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|

我应该计算 score 值的均值和标准差,例如[691,1] 中的值 1 是分数之一.

I should calculate mean and standard deviation of score values, e.g. the value 1 in [691,1] is one of scores.

推荐答案

您可以使用内置函数来获取聚合统计信息.以下是获取均值和标准差的方法.

You can use the built in functions to get aggregate statistics. Here's how to get mean and standard deviation.

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('columnName')).alias('mean'),
    _stddev(col('columnName')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

请注意,存在三种不同的标准偏差函数.从我使用的文档 (stddev) 返回以下内容:

Note that there are three different standard deviation functions. From the docs the one I used (stddev) returns the following:

聚合函数:返回无偏样本标准差组中的表达

Aggregate function: returns the unbiased sample standard deviation of the expression in a group

您也可以使用 describe() 方法:

You could use the describe() method as well:

df.describe().show()

有关更多信息,请参阅此链接:pyspark.sql.functions

Refer to this link for more info: pyspark.sql.functions

更新:这是处理嵌套数据的方式.

UPDATE: This is how you can work through the nested data.

使用 explode 将值提取到单独的行中,然后调用 meanstddev,如上所示.

Use explode to extract the values into separate rows, then call mean and stddev as shown above.

这是一个 MWE:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev

# mock up sample dataframe
df = sqlCtx.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())

# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        _mean(col('score')).alias('mean'),
        _stddev(col('score')).alias('std')
    )\
    .collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print([mean, std])

输出:

[2.3333333333333335, 1.505545305418162]

您可以使用 numpy 验证这些值是否正确:

You can verify that these values are correct using numpy:

vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])

说明:您的 "products" 列是 listlist.调用 explode 将为外部 list 的每个元素创建一个新行.然后从每个分解的行中获取 "score" 值,您已将其定义为 2 元素 list 中的第二个元素.最后,在这个新列上调用聚合函数.

Explanation: Your "products" column is a list of lists. Calling explode will make a new row for each element of the outer list. Then grab the "score" value from each of the exploded rows, which you have defined as the second element in a 2-element list. Finally, call the aggregate functions on this new column.

这篇关于如何计算给定 PySpark DataFrame 的均值和标准差?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆