如何在 Spark DataFrame 中将列除以其总和 [英] How to divide a column by its sum in a Spark DataFrame

查看:82
本文介绍了如何在 Spark DataFrame 中将列除以其总和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在不立即触发计算的情况下,高效地将一列除以它自己在 Spark DataFrame 中的总和?

How can I divide a column by its own sum in a Spark DataFrame, efficiently and without immediately triggering a computation?

假设我们有一些数据:

import pyspark
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as spf

spark = SparkSession.builder.master('local').getOrCreate()

data = spark.range(0, 100)

data # --> DataFrame[id: bigint]

我想在这个数据框上创建一个名为normalized"的新列,其中包含 id/sum(id).一种方法是预先计算总和,如下所示:

I’d like to create a new column on this data frame called "normalized" that contains id / sum(id). One way to do it is to pre-compute the sum, like this:

s = data.select(spf.sum('id')).collect()[0][0]
data2 = data.withColumn('normalized', spf.col('id') / s)
data2 # --> DataFrame[id: bigint, normalized: double]

这很好用,但它会立即触发计算;如果您为许多列定义类似的内容,则会导致对数据进行多次冗余传递.

That works fine, but it immediately triggers a computation; if you're defining something similar for many columns it will cause multiple redundant passes over the data.

另一种方法是使用包含整个表格的窗口规范:

Another way to do it is with a windowing specification that includes the whole table:

w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
data3 = data.withColumn('normalized', spf.col('id') / spf.sum('id').over(w))
data3 # --> DataFrame[id: bigint, normalized: double]

在这种情况下,定义 data3 没问题,但是一旦您尝试实际计算它,Spark 2.2.0 会将所有数据移动到单个分区中,这通常会导致作业失败适用于大型数据集.

In this case, it's fine to define data3, but once you try to actually compute it, Spark 2.2.0 will move all the data into a single partition, which typically causes the job to fail for large data sets.

还有哪些其他方法可以解决这个问题,即不会立即触发计算并且适用于大型数据集?我对任何解决方案都感兴趣,不一定是基于 pyspark 的解决方案.

What other approaches are there to solving this problem, that don't trigger an immediate computation and that will work with large data sets? I'm interested in any solutions, not necessarily solutions based on pyspark.

推荐答案

crossJoin 使用聚合是一种方法:

crossJoin with aggregate is one approach:

data.crossJoin( 
    data.select(spf.sum('id').alias("sum_id"))
).withColumn("normalized", spf.col("id") / spf.col("sum_id"))

但我不会太担心:

这很好用,但它会立即触发计算;如果您为许多列定义类似的东西,它会导致对数据进行多次冗余传递.

That works fine, but it immediately triggers a computation; if you're defining something similar for many columns it will cause multiple redundant passes over the data.

一次计算多个统计数据:

Just compute multiple statistics at once:

data2 = data.select(spf.rand(42).alias("x"), spf.randn(42).alias("y"))
mean_x, mean_y = data2.groupBy().mean().first()

剩下的只是对本地表达式的操作:

and the rest is just an operation on local expressions:

data2.select(spf.col("x") - mean_x, spf.col("y") - mean_y)

这篇关于如何在 Spark DataFrame 中将列除以其总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆