pyspark 将行转换为列 [英] pyspark convert rows to columns

查看:47
本文介绍了pyspark 将行转换为列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框,需要将同一组的行转换为列.基本上枢轴这些.下面是我的 df.

I have a dataframe where I need to convert rows of the same group to columns. basically pivot these. below is my df.

+------------+-------+-----+-------+
|Customer    |ID     |unit |order  |
+------------+-------+-----+-------+
|John        |123    |00015|1      |
|John        |123    |00016|2      |
|John        |345    |00205|3      |
|John        |345    |00206|4      |
|John        |789    |00283|5      |
|John        |789    |00284|6      |
+------------+-------+-----+-------+

我需要上述结果的数据...

I need the resultant data for the above as..

+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state   | ID_1  | unit_1 |seq_num_1 | ID_2   | unit_2 | seq_num_2 | ID_3   |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John    | 123   | 00015  | 1        |  345   | 00205  | 3         |  789   |00283  | 5        |
|John    | 123   | 00016  | 2        |  345   | 00206  | 4         |  789   |00284  | 6        |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+

我尝试使用 groupBy 和 pivot() 函数,但它的抛出错误表明找到了大的枢轴值.有什么方法可以在不使用 pivot() 函数的情况下获得结果..非常感谢任何帮助.谢谢.

I tried to groupBy and pivot() function, but its throwing error says large pivot values found. Is there any way to get the result without using the pivot() function..any help is greatly appreciated. thanks.

推荐答案

这看起来像是使用 dense_rank() 聚合函数创建泛型序列(dr 在下面的代码中)每个 Customer 组下不同的 ID ,然后在这个序列上进行透视.我们可以使用 row_number() 执行类似于 order 列的操作,以便它可以在 groupby 中使用:

This looks like a typical case of using dense_rank() aggregate function to create a generic sequence (dr in the below code) of distinct IDs under each group of Customer, then do pivoting on this sequence. we can do the similar to order column using row_number() so that it can be used in groupby:

from pyspark.sql import Window, functions as F

# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
    ('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
    ('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
    ('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])

添加两个窗口规范:w1 以获得 Customer 上的 IDdense_rank()>w2 在相同的CustomerID 下获取订单row_number().>

Add two Window Specs: w1 to get dense_rank() of IDs over Customer and w2 to get row_number() of order under the same Customer and ID.

w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')

基于上述两个 WinSpec 添加两个新列:dr(dense_rank) 和 sid(row_number)

Add two new columns based on the above two WinSpecs: dr(dense_rank) and sid(row_number)

df1 = df.select(
    "*", 
    F.dense_rank().over(w1).alias('dr'), 
    F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
|    John|123|00015|    1|  1|  1|
|    John|123|00016|    2|  1|  2|
|    John|345|00205|    3|  2|  1|
|    John|345|00206|    4|  2|  2|
|    John|789|00283|    5|  3|  1|
|    John|789|00284|    6|  3|  2|
|    John|789|00285|    7|  3|  3|
+--------+---+-----+-----+---+---+

找到max(dr),这样我们就可以预先定义要以range(1,N+1)为中心的列表(这将改进pivot 方法的效率).

Find the max(dr), so that we can pre-define the list to pivot on which is range(1,N+1) (this will improve the efficiency of pivot method).

N = df1.agg(F.max('dr')).first()[0]

Groupby Customer, sid 并使用 dr 透视,然后进行聚合:

Groupby Customer, sid and pivot with dr and then do the aggregate:

df_new = df1.groupby('Customer','sid') \
    .pivot('dr', range(1,N+1)) \
    .agg(
        F.first('ID').alias('ID'),
        F.first('unit').alias('unit'),
        F.first('order').alias('order')
)

df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

如果需要,重命名列名:

Rename the column names if needed:

import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|    John|  1| 123| 00015|      1| 345| 00205|      3| 789| 00283|      5|
|    John|  2| 123| 00016|      2| 345| 00206|      4| 789| 00284|      6|
|    John|  3|null|  null|   null|null|  null|   null| 789| 00285|      7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+

这篇关于pyspark 将行转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆