根据列值的变化对pyspark数据帧进行分区 [英] Partition pyspark dataframe based on the change in column value

查看:178
本文介绍了根据列值的变化对pyspark数据帧进行分区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在pyspark中有一个数据框. 假设其中有一些列a,b,c ... 我想随着列值的变化将数据分组.说

I have a dataframe in pyspark. Say the has some columns a,b,c... I want to group the data into groups as the value of column changes. Say

A  B
1  x
1  y
0  x
0  y
0  x
1  y
1  x
1  y

将有3个组作为(1x,1y),(0x,0y,0x),(1y,1x,1y) 以及对应的行数据

There will be 3 groups as (1x,1y),(0x,0y,0x),(1y,1x,1y) And corresponding row data

推荐答案

如果我正确理解,您希望每次A列更改值时都创建一个不同的组.

If I understand correctly you want to create a distinct group every time column A changes values.

首先,我们将创建一个单调递增的id,以保持行顺序不变:

First we'll create a monotonically increasing id to keep the row order as it is:

import pyspark.sql.functions as psf
df = sc.parallelize([[1,'x'],[1,'y'],[0,'x'],[0,'y'],[0,'x'],[1,'y'],[1,'x'],[1,'y']])\
    .toDF(['A', 'B'])\
    .withColumn("rn", psf.monotonically_increasing_id())
df.show()

    +---+---+----------+
    |  A|  B|        rn|
    +---+---+----------+
    |  1|  x|         0|
    |  1|  y|         1|
    |  0|  x|         2|
    |  0|  y|         3|
    |  0|  x|8589934592|
    |  1|  y|8589934593|
    |  1|  x|8589934594|
    |  1|  y|8589934595|
    +---+---+----------+

现在,我们将使用窗口函数来创建一个列,该列在每次A列更改时都包含1:

Now we'll use a window function to create a column that contains 1 every time column A changes:

from pyspark.sql import Window
w = Window.orderBy('rn')
df = df.withColumn("changed", (df.A != psf.lag('A', 1, 0).over(w)).cast('int'))

    +---+---+----------+-------+
    |  A|  B|        rn|changed|
    +---+---+----------+-------+
    |  1|  x|         0|      1|
    |  1|  y|         1|      0|
    |  0|  x|         2|      1|
    |  0|  y|         3|      0|
    |  0|  x|8589934592|      0|
    |  1|  y|8589934593|      1|
    |  1|  x|8589934594|      0|
    |  1|  y|8589934595|      0|
    +---+---+----------+-------+

最后,我们将使用另一个窗口函数为每个组分配不同的数字:

Finally we'll use another window function to allocate different numbers to each group:

df = df.withColumn("group_id", psf.sum("changed").over(w)).drop("rn").drop("changed")

    +---+---+--------+
    |  A|  B|group_id|
    +---+---+--------+
    |  1|  x|       1|
    |  1|  y|       1|
    |  0|  x|       2|
    |  0|  y|       2|
    |  0|  x|       2|
    |  1|  y|       3|
    |  1|  x|       3|
    |  1|  y|       3|
    +---+---+--------+

现在您可以建立小组了

这篇关于根据列值的变化对pyspark数据帧进行分区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆