PySpark 中的 Groupby cumcount [英] Groupby cumcount in PySpark

查看:62
本文介绍了PySpark 中的 Groupby cumcount的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个如下的数据框:

I have a dataframe as follows:

---------------
id   | name   |
---------------
 1   | joe    |
 1   | john   |
 2   | jane   |
 3   | jo     |
---------------

目标是,如果 'id' 列重复,则从 1 开始向其添加升序.

The goal is, if the 'id' column is duplicate, add ascending number to it starting from 1.

在 Pandas 中,我可以这样做:

In Pandas, I can do it this way:

count_id = df.groupby(['id']).cumcount()
count_num = count_id.replace(0, '').astype(str)
df['id'] += count_num

我尝试在 PySpark 中使用相同的逻辑,但没有成功.

I tried to use the same logic in PySpark with no success.

结果应该是:

id   | name   |
---------------
 1   | joe    |
 11  | john   |
 2   | jane   |
 3   | jo     |
---------------

如何在 PySpark 中实现相同的目标?非常感谢任何帮助.

How do I achieve the same in PySpark? Any help is greatly appreciated.

推荐答案

要复制该输出,您可以使用 Window 获取row_number 对于每个 id,然后 concat 将其添加到 <代码>id.

To replicate that output, you can use a Window to get the row_number for each id, and then concat to add it to the id.

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy("id").orderBy("name")
df.withColumn("row_number", f.row_number().over(w)-1)\
    .withColumn(
        "id", 
        f.when(
            f.col("row_number") > 0, 
            f.concat(f.col("id"), f.col("row_number"))
        ).otherwise(f.col("id"))
    )\
    .drop("row_number")\
    .show()
#+---+----+
#| id|name|
#+---+----+
#|  1| joe|
#| 11|john|
#|  3|  jo|
#|  2|jane|
#+---+----+

注意:这会将 id 列转换为 StringType 列(如果还没有).

Note: This will convert the id column into a StringType column if it isn't already.

为了获得您最初在问题中陈述的输出作为所需的结果,您必须除了计算行数外,还添加一个组计数列.仅当计数大于 1 时才连接行号.

In order to get the output you originally stated in the question as the desired result, you'd have to add a group count column in addition to calculating the row number. Only concatenate the row number if the count is greater than one.

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy("id")
df.withColumn("count", f.count("*").over(w))\
    .withColumn("row_number", f.row_number().over(w.orderBy("name")))\
    .withColumn(
        "id", 
        f.when(
            f.col("count") > 1, 
            f.concat(f.col("id"), f.col("row_number"))
        ).otherwise(f.col("id"))
    )\
    .drop("count", "row_number")\
    .show()
#+---+----+
#| id|name|
#+---+----+
#| 11| joe|
#| 12|john|
#|  3|  jo|
#|  2|jane|
#+---+----+

这篇关于PySpark 中的 Groupby cumcount的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆