在给定PySpark DataFrame的情况下,如何计算均值和标准差? [英] How to calculate mean and standard deviation given a PySpark DataFrame?

查看:1609
本文介绍了在给定PySpark DataFrame的情况下,如何计算均值和标准差?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个名为df的PySpark DataFrame(不是熊猫),它对于使用collect()来说是很大的.因此,下面给出的代码效率不高.它正在处理少量数据,但是现在失败了.

I have PySpark DataFrame (not pandas) called df that is quite large to use collect(). Therefore the below-given code is not efficient. It was working with a smaller amount of data, however now it fails.

import numpy as np

myList = df.collect()
total = []
for product,nb in myList:
    for p2,score in nb:
            total.append(score)
mean = np.mean(total)
std = np.std(total)

是否可以通过使用pyspark.sql.functions或类似方法将meanstd作为两个变量获取?

Is there any way to get mean and std as two variables by using pyspark.sql.functions or similar?

from pyspark.sql.functions import mean as mean_, std as std_

我可以使用withColumn,但是,这种方法逐行应用计算,并且不会返回单个变量.

I could use withColumn, however, this approach applies the calculations row by row, and it does not return a single variable.

更新:

df的示例内容:

+----------+------------------+
|product_PK|          products|
+----------+------------------+
|       680|[[691,1], [692,5]]|
|       685|[[691,2], [692,2]]|
|       684|[[691,1], [692,3]]|

我应该计算score值的平均值和标准偏差,例如[691,1]中的1值是分数之一.

I should calculate mean and standard deviation of score values, e.g. the value 1 in [691,1] is one of scores.

推荐答案

您可以使用内置函数来获取汇总统计信息.这是获取均值和标准差的方法.

You can use the built in functions to get aggregate statistics. Here's how to get mean and standard deviation.

from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('columnName')).alias('mean'),
    _stddev(col('columnName')).alias('std')
).collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

请注意,存在三种不同的标准偏差函数.从文档中,我使用的文档(stddev)返回以下内容:

Note that there are three different standard deviation functions. From the docs the one I used (stddev) returns the following:

聚合函数:返回无偏样本的标准偏差 分组中的表达式

Aggregate function: returns the unbiased sample standard deviation of the expression in a group

您也可以使用describe()方法:

df.describe().show()

有关更多信息,请参考此链接: pyspark.sql.functions

Refer to this link for more info: pyspark.sql.functions

更新:这是处理嵌套数据的方式.

UPDATE: This is how you can work through the nested data.

使用explode将值提取到单独的行中,然后调用meanstddev,如上所示.

Use explode to extract the values into separate rows, then call mean and stddev as shown above.

这是MWE:

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import explode, col, udf, mean as _mean, stddev as _stddev

# mock up sample dataframe
df = sqlCtx.createDataFrame(
    [(680, [[691,1], [692,5]]), (685, [[691,2], [692,2]]), (684, [[691,1], [692,3]])],
    ["product_PK", "products"]
)

# udf to get the "score" value - returns the item at index 1
get_score = udf(lambda x: x[1], IntegerType())

# explode column and get stats
df_stats = df.withColumn('exploded', explode(col('products')))\
    .withColumn('score', get_score(col('exploded')))\
    .select(
        _mean(col('score')).alias('mean'),
        _stddev(col('score')).alias('std')
    )\
    .collect()

mean = df_stats[0]['mean']
std = df_stats[0]['std']

print([mean, std])

哪个输出:

[2.3333333333333335, 1.505545305418162]

您可以使用numpy验证这些值正确:

You can verify that these values are correct using numpy:

vals = [1,5,2,2,1,3]
print([np.mean(vals), np.std(vals, ddof=1)])

说明:您的"products"列是listlist.调用explode将为外部list的每个元素添加一个新行.然后从每个爆炸行中获取"score"值,这些行已定义为2元素list中的第二个元素.最后,在此新列上调用聚合函数.

Explanation: Your "products" column is a list of lists. Calling explode will make a new row for each element of the outer list. Then grab the "score" value from each of the exploded rows, which you have defined as the second element in a 2-element list. Finally, call the aggregate functions on this new column.

这篇关于在给定PySpark DataFrame的情况下,如何计算均值和标准差?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆