PySpark将列除以总和 [英] PySpark divide column by its sum

查看:618
本文介绍了PySpark将列除以总和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将PySpark中的列除以它们各自的总和.我的数据框(此处仅使用一列)如下所示:

I am trying to divide columns in PySpark by their respective sums. My dataframe(using only one column here) looks like this:

event_rates = [[1,10.461016949152542], [2, 10.38953488372093], [3, 10.609418282548477]]
event_rates = spark.createDataFrame(event_rates, ['cluster_id','mean_encoded'])
event_rates.show()

+----------+------------------+
|cluster_id|      mean_encoded|
+----------+------------------+
|         1|10.461016949152542|
|         2| 10.38953488372093|
|         3|10.609418282548477|
+----------+------------------+

我尝试了两种方法来执行此操作,但未能获得结果

I tried two methods to do this but have failed in getting results

from pyspark.sql.functions import sum as spark_sum
cols = event_rates.columns[1:]
for each in cols:
    event_rates = event_rates.withColumn(each+"_scaled", event_rates[each]/spark_sum(event_rates[each]))

这给了我以下错误

org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and '`cluster_id`' is not an aggregate function. Wrap '((`mean_encoded` / sum(`mean_encoded`)) AS `mean_encoded_scaled`)' in windowing function(s) or wrap '`cluster_id`' in first() (or first_value) if you don't care which value you get.;;
Aggregate [cluster_id#22356L, mean_encoded#22357, (mean_encoded#22357 / sum(mean_encoded#22357)) AS mean_encoded_scaled#2

,并按照问题

and following the question here I tried the following

stats = (event_rates.agg([spark_sum(x).alias(x + '_sum') for x in cols]))
event_rates = event_rates.join(broadcast(stats))
exprs = [event_rates[x] / event_rates[event_rates + '_sum'] for x in cols]
event_rates.select(exprs)

但是第一行出现错误

AssertionError: all exprs should be Column

我如何解决这个问题?

推荐答案

这是有关如何将列mean_encoded除以其总和的示例.您需要先对列求和,然后对crossJoin求和回到前一个数据帧.然后,您可以将任意列除以其总和.

This is an example on how to divide column mean_encoded by its sum. You need to sum the column first then crossJoin back to the previous dataframe. Then, you can divide any column by its sum.

import pyspark.sql.functions as fn
from pyspark.sql.types import *

event_rates = event_rates.crossJoin(event_rates.groupby().agg(fn.sum('mean_encoded').alias('sum_mean_encoded')))
event_rates_div = event_rates.select('cluster_id', 
                                     'mean_encoded', 
                                     fn.col('mean_encoded') / fn.col('sum_mean_encoded'))

输出

+----------+------------------+---------------------------------+
|cluster_id|      mean_encoded|(mean_encoded / sum_mean_encoded)|
+----------+------------------+---------------------------------+
|         1|10.461016949152542|               0.3325183371367686|
|         2| 10.38953488372093|               0.3302461777809474|
|         3|10.609418282548477|               0.3372354850822839|
+----------+------------------+---------------------------------+

这篇关于PySpark将列除以总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆