在 PySpark 数据框中添加列总和作为新列 [英] Add column sum as new column in PySpark dataframe

查看:28
本文介绍了在 PySpark 数据框中添加列总和作为新列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 PySpark,并且我有一个包含一堆数字列的 Spark 数据框.我想添加一列,它是所有其他列的总和.

I'm using PySpark and I have a Spark dataframe with a bunch of numeric columns. I want to add a column that is the sum of all the other columns.

假设我的数据框有a"、b"和c"列.我知道我可以做到这一点:

Suppose my dataframe had columns "a", "b", and "c". I know I can do this:

df.withColumn('total_col', df.a + df.b + df.c)

问题是我不想单独输入每一列并添加它们,尤其是当我有很多列时.我希望能够自动执行此操作或通过指定要添加的列名列表来执行此操作.还有其他方法可以做到这一点吗?

The problem is that I don't want to type out each column individually and add them, especially if I have a lot of columns. I want to be able to do this automatically or by specifying a list of column names that I want to add. Is there another way to do this?

推荐答案

这并不明显.我没有看到 spark Dataframes API 中定义的列的基于行的总和.

This was not obvious. I see no row-based sum of the columns defined in the spark Dataframes API.

这可以通过一种相当简单的方式完成:

This can be done in a fairly simple way:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

df.columns 由 pyspark 作为字符串列表提供,给出 Spark Dataframe 中的所有列名.对于不同的总和,您可以提供任何其他列名列表.

df.columns is supplied by pyspark as a list of strings giving all of the column names in the Spark Dataframe. For a different sum, you can supply any other list of column names instead.

我没有尝试将此作为我的第一个解决方案,因为我不确定它会如何表现.但它有效.

I did not try this as my first solution because I wasn't certain how it would behave. But it works.

这太复杂了,但也很好用.

This is overly complicated, but works as well.

你可以这样做:

  1. 使用 df.columns 获取列的名称列表
  2. 使用该名称列表来制作列列表
  3. 将该列表传递给将在 折叠式函数方式
  1. use df.columns to get a list of the names of the columns
  2. use that names list to make a list of the columns
  3. pass that list to something that will invoke the column's overloaded add function in a fold-type functional manner

使用 python 的 reduce,了解运算符重载的工作原理,和列 here 的 pyspark 代码变为:

With python's reduce, some knowledge of how operator overloading works, and the pyspark code for columns here that becomes:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

注意这是一个python reduce,而不是spark RDD reduce,reduce的第二个参数中的括号项需要括号,因为它是一个列表生成器表达式.

Note this is a python reduce, not a spark RDD reduce, and the parenthesis term in the second parameter to reduce requires the parenthesis because it is a list generator expression.

经过测试,有效!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]

这篇关于在 PySpark 数据框中添加列总和作为新列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆