将组计数列添加到 PySpark 数据帧 [英] Adding a group count column to a PySpark dataframe

查看:27
本文介绍了将组计数列添加到 PySpark 数据帧的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

由于其出色的 Spark 处理能力,我从 R 和 tidyverse 来到 PySpark,我正在挣扎将某些概念从一个上下文映射到另一个上下文.

I am coming from R and the tidyverse to PySpark due to its superior Spark handling, and I am struggling to map certain concepts from one context to the other.

特别是,假设我有一个如下所示的数据集

In particular, suppose that I had a dataset like the following

x | y
--+--
a | 5
a | 8
a | 7
b | 1

并且我想添加一个包含每个 x 值的行数的列,如下所示:

and I wanted to add a column containing the number of rows for each x value, like so:

x | y | n
--+---+---
a | 5 | 3
a | 8 | 3
a | 7 | 3
b | 1 | 1

在 dplyr 中,我只想说:

In dplyr, I would just say:

import(tidyverse)

df <- read_csv("...")
df %>%
    group_by(x) %>%
    mutate(n = n()) %>%
    ungroup()

就是这样.如果我希望按行数总结,我可以在 PySpark 中做一些几乎同样简单的事情:

and that would be that. I can do something almost as simple in PySpark if I'm looking to summarize by number of rows:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

spark.read.csv("...") 
    .groupBy(col("x")) 
    .count() 
    .show()

而且我想我明白withColumn 等同于dplyr 的mutate.但是,当我执行以下操作时,PySpark 告诉我 withColumn 没有为 groupBy 数据定义:

And I thought I understood that withColumn was equivalent to dplyr's mutate. However, when I do the following, PySpark tells me that withColumn is not defined for groupBy data:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

spark = SparkSession.builder.getOrCreate()

spark.read.csv("...") 
    .groupBy(col("x")) 
    .withColumn("n", count("x")) 
    .show()

在短期内,我可以简单地创建包含计数的第二个数据帧并将其连接到原始数据帧.但是,在大表的情况下,这似乎会变得效率低下.实现此目的的规范方法是什么?

In the short run, I can simply create a second dataframe containing the counts and join it to the original dataframe. However, it seems like this could become inefficient in the case of large tables. What is the canonical way to accomplish this?

推荐答案

当您执行 groupBy() 时,您必须先指定聚合,然后才能显示结果.例如:

When you do a groupBy(), you have to specify the aggregation before you can display the results. For example:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
]
df = sqlCtx.createDataFrame(data, ["x", "y"])
df.groupBy('x').count().select('x', f.col('count').alias('n')).show()
#+---+---+
#|  x|  n|
#+---+---+
#|  b|  1|
#|  a|  3|
#+---+---+

这里我使用了 alias() 来重命名列.但这每组只返回一行.如果您想要附加计数的所有行,您可以使用 Window 执行此操作:

Here I used alias() to rename the column. But this only returns one row per group. If you want all rows with the count appended, you can do this with a Window:

from pyspark.sql import Window
w = Window.partitionBy('x')
df.select('x', 'y', f.count('x').over(w).alias('n')).sort('x', 'y').show()
#+---+---+---+
#|  x|  y|  n|
#+---+---+---+
#|  a|  5|  3|
#|  a|  7|  3|
#|  a|  8|  3|
#|  b|  1|  1|
#+---+---+---+

或者,如果您更熟悉 SQL,您可以将数据框注册为临时表并利用 pyspark-sql 来做同样的事情:

Or if you're more comfortable with SQL, you can register the dataframe as a temporary table and take advantage of pyspark-sql to do the same thing:

df.registerTempTable('table')
sqlCtx.sql(
    'SELECT x, y, COUNT(x) OVER (PARTITION BY x) AS n FROM table ORDER BY x, y'
).show()
#+---+---+---+
#|  x|  y|  n|
#+---+---+---+
#|  a|  5|  3|
#|  a|  7|  3|
#|  a|  8|  3|
#|  b|  1|  1|
#+---+---+---+

这篇关于将组计数列添加到 PySpark 数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆