将组计数列添加到PySpark数据框 [英] Adding a group count column to a PySpark dataframe

查看:129
本文介绍了将组计数列添加到PySpark数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

由于其出色的Spark处理能力,我来自R和 tidyverse 到PySpark.将某些概念从一个上下文映射到另一个上下文.

I am coming from R and the tidyverse to PySpark due to its superior Spark handling, and I am struggling to map certain concepts from one context to the other.

特别是,假设我有一个类似以下的数据集

In particular, suppose that I had a dataset like the following

x | y
--+--
a | 5
a | 8
a | 7
b | 1

,我想添加一列,其中包含每个x值的行数,如下所示:

and I wanted to add a column containing the number of rows for each x value, like so:

x | y | n
--+---+---
a | 5 | 3
a | 8 | 3
a | 7 | 3
b | 1 | 1

在dplyr中,我只会说:

In dplyr, I would just say:

import(tidyverse)

df <- read_csv("...")
df %>%
    group_by(x) %>%
    mutate(n = n()) %>%
    ungroup()

就是这样.如果要按行数总结,我可以在PySpark中做几乎一样简单的事情:

and that would be that. I can do something almost as simple in PySpark if I'm looking to summarize by number of rows:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

spark.read.csv("...") \
    .groupBy(col("x")) \
    .count() \
    .show()

我以为我理解withColumn等同于dplyr的mutate.但是,当我执行以下操作时,PySpark告诉我withColumn没有为groupBy数据定义:

And I thought I understood that withColumn was equivalent to dplyr's mutate. However, when I do the following, PySpark tells me that withColumn is not defined for groupBy data:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

spark = SparkSession.builder.getOrCreate()

spark.read.csv("...") \
    .groupBy(col("x")) \
    .withColumn("n", count("x")) \
    .show()

在短期内,我可以简单地创建一个包含计数的第二个数据框,并将其连接到原始数据框.但是,在大表的情况下,这似乎变得效率低下.达到此目的的规范方法是什么?

In the short run, I can simply create a second dataframe containing the counts and join it to the original dataframe. However, it seems like this could become inefficient in the case of large tables. What is the canonical way to accomplish this?

推荐答案

执行groupBy()时,必须指定聚合,然后才能显示结果.例如:

When you do a groupBy(), you have to specify the aggregation before you can display the results. For example:

import pyspark.sql.functions as f
data = [
    ('a', 5),
    ('a', 8),
    ('a', 7),
    ('b', 1),
]
df = sqlCtx.createDataFrame(data, ["x", "y"])
df.groupBy('x').count().select('x', f.col('count').alias('n')).show()
#+---+---+
#|  x|  n|
#+---+---+
#|  b|  1|
#|  a|  3|
#+---+---+

在这里,我使用alias()重命名该列.但这每组仅返回一行.如果您希望所有行都附加计数,则可以使用Window:

Here I used alias() to rename the column. But this only returns one row per group. If you want all rows with the count appended, you can do this with a Window:

from pyspark.sql import Window
w = Window.partitionBy('x')
df.select('x', 'y', f.count('x').over(w).alias('n')).sort('x', 'y').show()
#+---+---+---+
#|  x|  y|  n|
#+---+---+---+
#|  a|  5|  3|
#|  a|  7|  3|
#|  a|  8|  3|
#|  b|  1|  1|
#+---+---+---+

或者,如果您更熟悉SQL,则可以将数据帧注册为临时表,并利用pyspark-sql来执行同一操作:

Or if you're more comfortable with SQL, you can register the dataframe as a temporary table and take advantage of pyspark-sql to do the same thing:

df.registerTempTable('table')
sqlCtx.sql(
    'SELECT x, y, COUNT(x) OVER (PARTITION BY x) AS n FROM table ORDER BY x, y'
).show()
#+---+---+---+
#|  x|  y|  n|
#+---+---+---+
#|  a|  5|  3|
#|  a|  7|  3|
#|  a|  8|  3|
#|  b|  1|  1|
#+---+---+---+

这篇关于将组计数列添加到PySpark数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆