pyspark将行转换为列 [英] pyspark convert rows to columns

查看:87
本文介绍了pyspark将行转换为列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框,需要将同一组的行转换为列.基本上将这些.下面是我的df.

I have a dataframe where I need to convert rows of the same group to columns. basically pivot these. below is my df.

+------------+-------+-----+-------+
|Customer    |ID     |unit |order  |
+------------+-------+-----+-------+
|John        |123    |00015|1      |
|John        |123    |00016|2      |
|John        |345    |00205|3      |
|John        |345    |00206|4      |
|John        |789    |00283|5      |
|John        |789    |00284|6      |
+------------+-------+-----+-------+

我需要上面的结果数据.

I need the resultant data for the above as..

+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state   | ID_1  | unit_1 |seq_num_1 | ID_2   | unit_2 | seq_num_2 | ID_3   |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John    | 123   | 00015  | 1        |  345   | 00205  | 3         |  789   |00283  | 5        |
|John    | 123   | 00016  | 2        |  345   | 00206  | 4         |  789   |00284  | 6        |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+

我尝试对groupBy和ivot()函数进行操作,但是它的抛出错误表明找到了较大的枢轴值.有没有不使用ivot()函数就可以得到结果的方法..非常感谢您的帮助.谢谢.

I tried to groupBy and pivot() function, but its throwing error says large pivot values found. Is there any way to get the result without using the pivot() function..any help is greatly appreciated. thanks.

推荐答案

这似乎是使用 dense_rank()聚合函数创建通用序列( dr 在下面的代码中)在每个 Customer 组下的不同 ID ,然后按此顺序进行旋转.我们可以使用 row_number()做与 order 列类似的操作,以便可以在groupby中使用它:

This looks like a typical case of using dense_rank() aggregate function to create a generic sequence (dr in the below code) of distinct IDs under each group of Customer, then do pivoting on this sequence. we can do the similar to order column using row_number() so that it can be used in groupby:

from pyspark.sql import Window, functions as F

# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

添加两个窗口规范: w1 ,以通过 Customer 获得 ID dense_rank()> w2 在相同的 Customer ID 下获得 order row_number().

Add two Window Specs: w1 to get dense_rank() of IDs over Customer and w2 to get row_number() of order under the same Customer and ID.

w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')

根据上述两个WinSpec添加两个新列: dr (dense_rank)和 sid (row_number)

Add two new columns based on the above two WinSpecs: dr(dense_rank) and sid(row_number)

df1 = df.select(
    "*", 
    F.dense_rank().over(w1).alias('dr'), 
    F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
|    John|123|00015|    1|  1|  1|
|    John|123|00016|    2|  1|  2|
|    John|345|00205|    3|  2|  1|
|    John|345|00206|    4|  2|  2|
|    John|789|00283|    5|  3|  1|
|    John|789|00284|    6|  3|  2|
|    John|789|00285|    7|  3|  3|
+--------+---+-----+-----+---+---+

找到 max(dr),以便我们可以预定义列表以透视 range(1,N + 1)(这将会有所改善) pivot 方法的效率).

Find the max(dr), so that we can pre-define the list to pivot on which is range(1,N+1) (this will improve the efficiency of pivot method).

N = df1.agg(F.max('dr')).first()[0]

分组依据 Customer sid 并使用 dr 进行透视,然后进行聚合:

Groupby Customer, sid and pivot with dr and then do the aggregate:

df_new = df1.groupby('Customer','sid') \
    .pivot('dr', range(1,N+1)) \
    .agg(
        F.first('ID').alias('ID'),
        F.first('unit').alias('unit'),
        F.first('order').alias('order')
)

df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

根据需要重命名列名称:

Rename the column names if needed:

import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

这篇关于pyspark将行转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆