如何在 Spark SQL 中透视多列? [英] How to pivot on multiple columns in Spark SQL?

查看:31
本文介绍了如何在 Spark SQL 中透视多列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要在 pyspark 数据框中旋转多于一列.示例数据框,

 >>>d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]>>>mydf = spark.createDataFrame(d,['id','day','price','units'])>>>mydf.show()+---+---+-----+-----+|id|天|价格|单位|+---+---+-----+-----+|100|1|23|10||100|2|45|11||100|3|67|12||100|4|78|13||101|1|23|10||101|2|45|13||101|3|67|14||101|4|78|15||102|1|23|10||102|2|45|11||102|3|67|16||102|4|78|18|+---+---+-----+-----+

现在,如果我需要根据日期将每个 id 的价格列放入一行,那么我可以使用数据透视方法,

<预><代码>>>>pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.第一('价格'))>>>pvtdf.show()+---+-------+-------+-------+-------+|ID|价格_1|价格_2|价格_3|价格_4|+---+-------+-------+-------+-------+|100|23|45|67|78||101|23|45|67|78||102|23|45|67|78|+---+-------+-------+-------+-------+

因此,当我需要将单位列也作为价格转置时,我必须像上面一样为单位再创建一个数据框,然后使用 id 将两者连接起来.但是,当我有更多列时,我尝试了一个函数去做,

<预><代码>>>>def pivot_udf(df,*cols):... mydf = df.select('id').drop_duplicates()...对于 c 中的 cols:... mydf = mydf.join(df.withColumn('combcol',F.concat(F.lit('{}_'.format(c)),df['day'])).groupby('id').pivot('combcol').agg(F.first(c)),'id')...返回mydf...>>>pivot_udf(mydf,'price','units').show()+---+-------+-------+-------+-------+-------+-------+-------+-------+|id|price_1|price_2|price_3|price_4|units_1|units_2|units_3|units_4|+---+-------+-------+-------+-------+-------+-------+-------+-------+|100|23|45|67|78|10|11|12|13||101|23|45|67|78|10|13|14|15||102|23|45|67|78|10|11|16|18|+---+-------+-------+-------+-------+-------+-------+-------+-------+

需要建议,如果这样做是好的做法,以及是否有其他更好的方法.提前致谢!

解决方案

这是一种非 UDF 方式,涉及单个数据透视表(因此,只需单列扫描即可识别所有唯一日期).

dff = mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))

结果如下(对不匹配的排序和命名表示歉意):

+---+-------+------+-------+------+-------+------+-------+------+|id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|+---+-------+------+-------+------+-------+------+-------+------+|100|23|10|45|11|67|12|78|13||101|23|10|45|13|67|14|78|15||102|23|10|45|11|67|16|78|18|+---+-------+------+-------+------+-------+------+-------+------+

我们只是在当天旋转后在 priceunit 列上聚合.

如果需要命名,

dff.select([F.col(c).name('_'.join(x for x in c.split('_')[::-1])) for c in dff.columns]).show()+---+-------+------+-------+------+-------+------+-------+------+|id|price_1|unit_1|price_2|unit_2|price_3|unit_3|price_4|unit_4|+---+-------+------+-------+------+-------+------+-------+------+|100|23|10|45|11|67|12|78|13||101|23|10|45|13|67|14|78|15||102|23|10|45|11|67|16|78|18|+---+-------+------+-------+------+-------+------+-------+------+

I need to pivot more than one column in a pyspark dataframe. Sample dataframe,

 >>> d = [(100,1,23,10),(100,2,45,11),(100,3,67,12),(100,4,78,13),(101,1,23,10),(101,2,45,13),(101,3,67,14),(101,4,78,15),(102,1,23,10),(102,2,45,11),(102,3,67,16),(102,4,78,18)]
>>> mydf = spark.createDataFrame(d,['id','day','price','units'])
>>> mydf.show()
+---+---+-----+-----+
| id|day|price|units|
+---+---+-----+-----+
|100|  1|   23|   10|
|100|  2|   45|   11|
|100|  3|   67|   12|
|100|  4|   78|   13|
|101|  1|   23|   10|
|101|  2|   45|   13|
|101|  3|   67|   14|
|101|  4|   78|   15|
|102|  1|   23|   10|
|102|  2|   45|   11|
|102|  3|   67|   16|
|102|  4|   78|   18|
+---+---+-----+-----+

Now,if I need to get price column into a row for each id based on day, then I can use pivot method as,

>>> pvtdf = mydf.withColumn('combcol',F.concat(F.lit('price_'),mydf['day'])).groupby('id').pivot('combcol').agg(F.first('price'))
>>> pvtdf.show()
+---+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|
+---+-------+-------+-------+-------+
|100|     23|     45|     67|     78|
|101|     23|     45|     67|     78|
|102|     23|     45|     67|     78|
+---+-------+-------+-------+-------+

so when I need units column as well to be transposed as price, either I got to create one more dataframe as above for units and then join both using id.But, when I have more columns as such, I tried a function to do it,

>>> def pivot_udf(df,*cols):
...     mydf = df.select('id').drop_duplicates()
...     for c in cols:
...        mydf = mydf.join(df.withColumn('combcol',F.concat(F.lit('{}_'.format(c)),df['day'])).groupby('id').pivot('combcol').agg(F.first(c)),'id')
...     return mydf
...
>>> pivot_udf(mydf,'price','units').show()
+---+-------+-------+-------+-------+-------+-------+-------+-------+
| id|price_1|price_2|price_3|price_4|units_1|units_2|units_3|units_4|
+---+-------+-------+-------+-------+-------+-------+-------+-------+
|100|     23|     45|     67|     78|     10|     11|     12|     13|
|101|     23|     45|     67|     78|     10|     13|     14|     15|
|102|     23|     45|     67|     78|     10|     11|     16|     18|
+---+-------+-------+-------+-------+-------+-------+-------+-------+

Need suggestions on ,if it is good practice to do so and if any other better way of doing it. Thanks in advance!

解决方案

Here's a non-UDF way involving a single pivot (hence, just a single column scan to identify all the unique dates).

dff = mydf.groupBy('id').pivot('day').agg(F.first('price').alias('price'),F.first('units').alias('unit'))

Here's the result (apologies for the non-matching ordering and naming):

+---+-------+------+-------+------+-------+------+-------+------+               
| id|1_price|1_unit|2_price|2_unit|3_price|3_unit|4_price|4_unit|
+---+-------+------+-------+------+-------+------+-------+------+
|100|     23|    10|     45|    11|     67|    12|     78|    13|
|101|     23|    10|     45|    13|     67|    14|     78|    15|
|102|     23|    10|     45|    11|     67|    16|     78|    18|
+---+-------+------+-------+------+-------+------+-------+------+

We just aggregate both on the price and the unit column after pivoting on the day.

If naming required as in question,

dff.select([F.col(c).name('_'.join(x for x in c.split('_')[::-1])) for c in dff.columns]).show()

+---+-------+------+-------+------+-------+------+-------+------+
| id|price_1|unit_1|price_2|unit_2|price_3|unit_3|price_4|unit_4|
+---+-------+------+-------+------+-------+------+-------+------+
|100|     23|    10|     45|    11|     67|    12|     78|    13|
|101|     23|    10|     45|    13|     67|    14|     78|    15|
|102|     23|    10|     45|    11|     67|    16|     78|    18|
+---+-------+------+-------+------+-------+------+-------+------+

这篇关于如何在 Spark SQL 中透视多列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆