如何在Spark DataFrame中将列除以总和 [英] How to divide a column by its sum in a Spark DataFrame

查看:928
本文介绍了如何在Spark DataFrame中将列除以总和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在Spark DataFrame中有效地将一列除以其自身的总和,而又不立即触发计算?

How can I divide a column by its own sum in a Spark DataFrame, efficiently and without immediately triggering a computation?

假设我们有一些数据:

import pyspark
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as spf

spark = SparkSession.builder.master('local').getOrCreate()

data = spark.range(0, 100)

data # --> DataFrame[id: bigint]

我想在此数据框中创建一个新列,称为规范化",其中包含id / sum(id).一种方法是预先计算总和,如下所示:

I’d like to create a new column on this data frame called "normalized" that contains id / sum(id). One way to do it is to pre-compute the sum, like this:

s = data.select(spf.sum('id')).collect()[0][0]
data2 = data.withColumn('normalized', spf.col('id') / s)
data2 # --> DataFrame[id: bigint, normalized: double]

这很好,但是立即触发了计算;如果您为许多列定义类似的内容,则将导致对数据进行多次冗余传递.

That works fine, but it immediately triggers a computation; if you're defining something similar for many columns it will cause multiple redundant passes over the data.

另一种方法是使用包含整个表的窗口规范:

Another way to do it is with a windowing specification that includes the whole table:

w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
data3 = data.withColumn('normalized', spf.col('id') / spf.sum('id').over(w))
data3 # --> DataFrame[id: bigint, normalized: double]

在这种情况下,定义data3很好,但是一旦您尝试实际计算它,Spark 2.2.0会将所有数据移动到单个分区中,这通常会导致大型数据集作业失败.

In this case, it's fine to define data3, but once you try to actually compute it, Spark 2.2.0 will move all the data into a single partition, which typically causes the job to fail for large data sets.

还有什么其他方法可以解决此问题,而不会触发立即计算并且可以处理大型数据集?我对任何解决方案都感兴趣,不一定是基于pyspark的解决方案.

What other approaches are there to solving this problem, that don't trigger an immediate computation and that will work with large data sets? I'm interested in any solutions, not necessarily solutions based on pyspark.

推荐答案

crossJoin与聚合是一种方法:

data.crossJoin( 
    data.select(spf.sum('id').alias("sum_id"))
).withColumn("normalized", spf.col("id") / spf.col("sum_id"))

但我不必担心:

这很好,但是立即触发了计算;如果您为许多列定义类似的内容,则将导致对数据进行多次冗余传递.

That works fine, but it immediately triggers a computation; if you're defining something similar for many columns it will cause multiple redundant passes over the data.

只需一次计算多个统计信息:

Just compute multiple statistics at once:

data2 = data.select(spf.rand(42).alias("x"), spf.randn(42).alias("y"))
mean_x, mean_y = data2.groupBy().mean().first()

其余只是对本地表达式的操作:

and the rest is just an operation on local expressions:

data2.select(spf.col("x") - mean_x, spf.col("y") - mean_y)

这篇关于如何在Spark DataFrame中将列除以总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆