pyspark 将行转换为列 [英] pyspark convert rows to columns
问题描述
我有一个数据框,需要将同一组的行转换为列.基本上枢轴这些.下面是我的 df.
I have a dataframe where I need to convert rows of the same group to columns. basically pivot these. below is my df.
+------------+-------+-----+-------+
|Customer |ID |unit |order |
+------------+-------+-----+-------+
|John |123 |00015|1 |
|John |123 |00016|2 |
|John |345 |00205|3 |
|John |345 |00206|4 |
|John |789 |00283|5 |
|John |789 |00284|6 |
+------------+-------+-----+-------+
我需要上述结果的数据...
I need the resultant data for the above as..
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|state | ID_1 | unit_1 |seq_num_1 | ID_2 | unit_2 | seq_num_2 | ID_3 |unit_3 |seq_num_3 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
|John | 123 | 00015 | 1 | 345 | 00205 | 3 | 789 |00283 | 5 |
|John | 123 | 00016 | 2 | 345 | 00206 | 4 | 789 |00284 | 6 |
+--------+-------+--------+----------+--------+--------+-----------+--------+-------+----------+
我尝试使用 groupBy 和 pivot() 函数,但它的抛出错误表明找到了大的枢轴值.有什么方法可以在不使用 pivot() 函数的情况下获得结果..非常感谢任何帮助.谢谢.
I tried to groupBy and pivot() function, but its throwing error says large pivot values found. Is there any way to get the result without using the pivot() function..any help is greatly appreciated. thanks.
推荐答案
这看起来像是使用 dense_rank() 聚合函数创建泛型序列(dr
在下面的代码中)每个 Customer 组下不同的 ID ,然后在这个序列上进行透视.我们可以使用 row_number() 执行类似于 order
列的操作,以便它可以在 groupby 中使用:
This looks like a typical case of using dense_rank() aggregate function to create a generic sequence (dr
in the below code) of distinct IDs under each group of Customer, then do pivoting on this sequence. we can do the similar to order
column using row_number() so that it can be used in groupby:
from pyspark.sql import Window, functions as F
# below I added an extra row for a reference when the number of rows vary for different IDs
df = spark.createDataFrame([
('John', '123', '00015', '1'), ('John', '123', '00016', '2'), ('John', '345', '00205', '3'),
('John', '345', '00206', '4'), ('John', '789', '00283', '5'), ('John', '789', '00284', '6'),
('John', '789', '00285', '7')
], ['Customer', 'ID', 'unit', 'order'])
添加两个窗口规范:w1
以获得 Customer 和 上的 ID 的 dense_rank()>w2
在相同的Customer 和ID 下获取订单的row_number().>
Add two Window Specs: w1
to get dense_rank() of IDs over Customer and w2
to get row_number() of order under the same Customer and ID.
w1 = Window.partitionBy('Customer').orderBy('ID')
w2 = Window.partitionBy('Customer','ID').orderBy('order')
基于上述两个 WinSpec 添加两个新列:dr
(dense_rank) 和 sid
(row_number)
Add two new columns based on the above two WinSpecs: dr
(dense_rank) and sid
(row_number)
df1 = df.select(
"*",
F.dense_rank().over(w1).alias('dr'),
F.row_number().over(w2).alias('sid')
)
+--------+---+-----+-----+---+---+
|Customer| ID| unit|order| dr|sid|
+--------+---+-----+-----+---+---+
| John|123|00015| 1| 1| 1|
| John|123|00016| 2| 1| 2|
| John|345|00205| 3| 2| 1|
| John|345|00206| 4| 2| 2|
| John|789|00283| 5| 3| 1|
| John|789|00284| 6| 3| 2|
| John|789|00285| 7| 3| 3|
+--------+---+-----+-----+---+---+
找到max(dr)
,这样我们就可以预先定义要以range(1,N+1)
为中心的列表(这将改进pivot
方法的效率).
Find the max(dr)
, so that we can pre-define the list to pivot on which is range(1,N+1)
(this will improve the efficiency of pivot
method).
N = df1.agg(F.max('dr')).first()[0]
Groupby Customer
, sid
并使用 dr
透视,然后进行聚合:
Groupby Customer
, sid
and pivot with dr
and then do the aggregate:
df_new = df1.groupby('Customer','sid') \
.pivot('dr', range(1,N+1)) \
.agg(
F.first('ID').alias('ID'),
F.first('unit').alias('unit'),
F.first('order').alias('order')
)
df_new.show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|1_ID|1_unit|1_order|2_ID|2_unit|2_order|3_ID|3_unit|3_order|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
| John| 1| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 2| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
| John| 3|null| null| null|null| null| null| 789| 00285| 7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
如果需要,重命名列名:
Rename the column names if needed:
import re
df_new.toDF(*['_'.join(reversed(re.split('_',c,1))) for c in df_new.columns]).show()
+--------+---+----+------+-------+----+------+-------+----+------+-------+
|Customer|sid|ID_1|unit_1|order_1|ID_2|unit_2|order_2|ID_3|unit_3|order_3|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
| John| 1| 123| 00015| 1| 345| 00205| 3| 789| 00283| 5|
| John| 2| 123| 00016| 2| 345| 00206| 4| 789| 00284| 6|
| John| 3|null| null| null|null| null| null| 789| 00285| 7|
+--------+---+----+------+-------+----+------+-------+----+------+-------+
这篇关于pyspark 将行转换为列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!