PySpark Dataframe将两列转换为基于第三列值的新元组列 [英] PySpark Dataframe cast two columns into new column of tuples based value of a third column

查看:253
本文介绍了PySpark Dataframe将两列转换为基于第三列值的新元组列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

正如主题所描述的,我有一个PySpark数据框,需要将两列转换为 一个新列,它是基于第三列的值的元组列表.这种转换会减少或 通过键值,产品ID(在这种情况下为产品ID)和结果一行将数据框展平 每个键.

As the subject describes, I have a PySpark Dataframe that I need to cast two columns into a new column that is a list of tuples based the value of a third column. This cast will reduce or flatten the dataframe by a key value, product id in this case, and the result os one row per key.

此数据框中有亿万行,具有3700万个唯一产品ID.所以我需要 一种在Spark群集上进行转换而又不带回任何数据的方法 给驾驶员(在本例中为Jupyter).

There are hundreds of millions of rows in this dataframe, with 37M unique product ids. Therefore I need a way to do the transformation on the spark cluster without bringing back any data to the driver (Jupyter in this case).

以下是我的数据框的一部分产品摘录:

Here is an extract of my dataframe for just 1 product:

+-----------+-------------------+-------------+--------+----------+---------------+
| product_id|      purchase_date|days_warranty|store_id|year_month|       category|
+-----------+-------------------+-----------+----------+----------+---------------+
|02147465400|2017-05-16 00:00:00|           30|     205|   2017-05|     CATEGORY A|
|02147465400|2017-04-15 00:00:00|           30|     205|   2017-04|     CATEGORY A|
|02147465400|2018-07-11 00:00:00|           30|     205|   2018-07|     CATEGORY A|
|02147465400|2017-06-14 00:00:00|           30|     205|   2017-06|     CATEGORY A|
|02147465400|2017-03-16 00:00:00|           30|     205|   2017-03|     CATEGORY A|
|02147465400|2017-08-14 00:00:00|           30|     205|   2017-08|     CATEGORY A|
|02147465400|2017-09-12 00:00:00|           30|     205|   2017-09|     CATEGORY A|
|02147465400|2017-01-21 00:00:00|           30|     205|   2017-01|     CATEGORY A|
|02147465400|2018-08-14 00:00:00|           30|     205|   2018-08|     CATEGORY A|
|02147465400|2018-08-23 00:00:00|           30|     205|   2018-08|     CATEGORY A|
|02147465400|2017-10-11 00:00:00|           30|     205|   2017-10|     CATEGORY A|
|02147465400|2017-12-12 00:00:00|           30|     205|   2017-12|     CATEGORY A|
|02147465400|2017-02-15 00:00:00|           30|     205|   2017-02|     CATEGORY A|
|02147465400|2018-04-12 00:00:00|           30|     205|   2018-04|     CATEGORY A|
|02147465400|2018-03-12 00:00:00|           30|     205|   2018-03|     CATEGORY A|
|02147465400|2018-05-15 00:00:00|           30|     205|   2018-05|     CATEGORY A|
|02147465400|2018-02-12 00:00:00|           30|     205|   2018-02|     CATEGORY A|
|02147465400|2018-06-14 00:00:00|           30|     205|   2018-06|     CATEGORY A|
|02147465400|2018-01-11 00:00:00|           30|     205|   2018-01|     CATEGORY A|
|02147465400|2017-07-20 00:00:00|           30|     205|   2017-07|     CATEGORY A|
|02147465400|2017-11-11 00:00:00|           30|     205|   2017-11|     CATEGORY A|
|02147465400|2017-01-05 00:00:00|           90|     205|   2017-01|     CATEGORY B|
|02147465400|2017-01-21 00:00:00|           90|     205|   2017-01|     CATEGORY B|
|02147465400|2017-10-09 00:00:00|           90|     205|   2017-10|     CATEGORY B|
|02147465400|2018-07-11 00:00:00|           90|     205|   2018-07|     CATEGORY B|
|02147465400|2017-04-16 00:00:00|           90|     205|   2017-04|     CATEGORY B|
|02147465400|2018-09-16 00:00:00|           90|     205|   2018-09|     CATEGORY B|
|02147465400|2018-04-14 00:00:00|           90|     205|   2018-04|     CATEGORY B|
|02147465400|2018-01-12 00:00:00|           90|     205|   2018-01|     CATEGORY B|
|02147465400|2017-07-15 00:00:00|           90|     205|   2017-07|     CATEGORY B|
+-----------+-------------------+-----------+----------+----------+---------------+

这是所需的结果数据框,一种产品对应一行,其中各行 原始数据框的cast_date和days_warranty列已强制转换 根据类别列的值作为元组数组放入新列中:

Here is the desired resulting dataframe, one row for the one product, where the rows of the original dataframe have the purchase_date and days_warranty columns cast as an array of tuples into new columns based on the category column value:

+-----------+----------------------------+----------------------------+
| product_id|                  CATEGORY A|                  CATEGORY B| 
+-----------+----------------------------+----------------------------+
|02147465400| [ (2017-05-16 00:00:00,30),| [ (2017-01-05 00:00:00,90),| 
|           |   (2017-04-15 00:00:00,30),|   (2017-01-21 00:00:00,90),|
|           |   (2018-07-11 00:00:00,30),|   (2017-10-09 00:00:00,90),|
|           |   (2017-06-14 00:00:00,30),|   (2018-07-11 00:00:00,90),|
|           |   (2017-03-16 00:00:00,30),|   (2017-04-16 00:00:00,90),|
|           |   (2017-08-14 00:00:00,30),|   (2018-09-16 00:00:00,90),|
|           |   (2017-09-12 00:00:00,30),|   (2018-04-14 00:00:00,90),|
|           |   (2017-01-21 00:00:00,30),|   (2018-01-12 00:00:00,90),|
|           |   (2018-08-14 00:00:00,30),|   (2017-07-15 00:00:00,90) |
|           |   (2018-08-23 00:00:00,30),| ]                          |
|           |   (2017-10-11 00:00:00,30),|                            |
|           |   (2017-12-12 00:00:00,30),|                            |
|           |   (2017-02-15 00:00:00,30),|                            |
|           |   (2018-04-12 00:00:00,30),|                            |
|           |   (2018-03-12 00:00:00,30),|                            |
|           |   (2018-05-15 00:00:00,30),|                            |
|           |   (2018-02-12 00:00:00,30),|                            |
|           |   (2018-06-14 00:00:00,30),|                            |
|           |   (2018-01-11 00:00:00,30),|                            |
|           |   (2017-07-20 00:00:00,30) |                            |
|           | ]                                                       |
+-----------+----------------------------+----------------------------+

推荐答案

假设您的Dataframe被称为df:

from pyspark.sql.functions import struct
from pyspark.sql.functions import collect_list

gdf = (df.select("product_id", "category", struct("purchase_date", "warranty_days").alias("pd_wd"))
.groupBy("product_id")
.pivot("category")
.agg(collect_list("pd_wd")))

本质上,您必须使用struct()purchase_datewarranty_days分组到单个列中.然后,您只需按product_id分组,按category进行旋转,就可以汇总为collect_list().

Essentially, you have to group the purchase_date and warranty_days into a single column using struct(). Then, you are just grouping by product_id, pivoting by category, can aggregating as collect_list().

这篇关于PySpark Dataframe将两列转换为基于第三列值的新元组列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆