将组计数列添加到 PySpark 数据帧 [英] Adding a group count column to a PySpark dataframe
问题描述
由于其出色的 Spark 处理能力,我从 R 和 tidyverse 来到 PySpark,我正在挣扎将某些概念从一个上下文映射到另一个上下文.
I am coming from R and the tidyverse to PySpark due to its superior Spark handling, and I am struggling to map certain concepts from one context to the other.
特别是,假设我有一个如下所示的数据集
In particular, suppose that I had a dataset like the following
x | y
--+--
a | 5
a | 8
a | 7
b | 1
并且我想添加一个包含每个 x
值的行数的列,如下所示:
and I wanted to add a column containing the number of rows for each x
value, like so:
x | y | n
--+---+---
a | 5 | 3
a | 8 | 3
a | 7 | 3
b | 1 | 1
在 dplyr 中,我只想说:
In dplyr, I would just say:
import(tidyverse)
df <- read_csv("...")
df %>%
group_by(x) %>%
mutate(n = n()) %>%
ungroup()
就是这样.如果我希望按行数总结,我可以在 PySpark 中做一些几乎同样简单的事情:
and that would be that. I can do something almost as simple in PySpark if I'm looking to summarize by number of rows:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.getOrCreate()
spark.read.csv("...")
.groupBy(col("x"))
.count()
.show()
而且我想我明白withColumn
等同于dplyr 的mutate
.但是,当我执行以下操作时,PySpark 告诉我 withColumn
没有为 groupBy
数据定义:
And I thought I understood that withColumn
was equivalent to dplyr's mutate
. However, when I do the following, PySpark tells me that withColumn
is not defined for groupBy
data:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count
spark = SparkSession.builder.getOrCreate()
spark.read.csv("...")
.groupBy(col("x"))
.withColumn("n", count("x"))
.show()
在短期内,我可以简单地创建包含计数的第二个数据帧并将其连接到原始数据帧.但是,在大表的情况下,这似乎会变得效率低下.实现此目的的规范方法是什么?
In the short run, I can simply create a second dataframe containing the counts and join it to the original dataframe. However, it seems like this could become inefficient in the case of large tables. What is the canonical way to accomplish this?
推荐答案
当您执行 groupBy()
时,您必须先指定聚合,然后才能显示结果.例如:
When you do a groupBy()
, you have to specify the aggregation before you can display the results. For example:
import pyspark.sql.functions as f
data = [
('a', 5),
('a', 8),
('a', 7),
('b', 1),
]
df = sqlCtx.createDataFrame(data, ["x", "y"])
df.groupBy('x').count().select('x', f.col('count').alias('n')).show()
#+---+---+
#| x| n|
#+---+---+
#| b| 1|
#| a| 3|
#+---+---+
这里我使用了 alias()
来重命名列.但这每组只返回一行.如果您想要附加计数的所有行,您可以使用 Window
执行此操作:
Here I used alias()
to rename the column. But this only returns one row per group. If you want all rows with the count appended, you can do this with a Window
:
from pyspark.sql import Window
w = Window.partitionBy('x')
df.select('x', 'y', f.count('x').over(w).alias('n')).sort('x', 'y').show()
#+---+---+---+
#| x| y| n|
#+---+---+---+
#| a| 5| 3|
#| a| 7| 3|
#| a| 8| 3|
#| b| 1| 1|
#+---+---+---+
或者,如果您更熟悉 SQL,您可以将数据框注册为临时表并利用 pyspark-sql
来做同样的事情:
Or if you're more comfortable with SQL, you can register the dataframe as a temporary table and take advantage of pyspark-sql
to do the same thing:
df.registerTempTable('table')
sqlCtx.sql(
'SELECT x, y, COUNT(x) OVER (PARTITION BY x) AS n FROM table ORDER BY x, y'
).show()
#+---+---+---+
#| x| y| n|
#+---+---+---+
#| a| 5| 3|
#| a| 7| 3|
#| a| 8| 3|
#| b| 1| 1|
#+---+---+---+
这篇关于将组计数列添加到 PySpark 数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!