在PySpark数据框中的选定时间间隔内的日期间隔之间复制记录 [英] duplicating records between date gaps within a selected time interval in a PySpark dataframe

查看:82
本文介绍了在PySpark数据框中的选定时间间隔内的日期间隔之间复制记录的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个PySpark数据框,可以跟踪几个月内产品价格和状态发生的变化.这意味着只有在与上个月相比发生变化(状态或价格)变化时,才会创建新行,例如下面的虚拟数据中

I have a PySpark dataframe that keeps track of changes that occur in a product's price and status over months. This means that a new row is created only when a change occurred (in either status or price) compared to the previous month, like in the dummy data below

    ----------------------------------------
    |product_id| status    | price| month  |
    ----------------------------------------
    |1         | available | 5    | 2019-10|
    ----------------------------------------
    |1         | available | 8    | 2020-08|
    ----------------------------------------
    |1         | limited   | 8    | 2020-10|
    ----------------------------------------
    |2         | limited   | 1    | 2020-09|
    ----------------------------------------
    |2         | limited   | 3    | 2020-10|
    ----------------------------------------

我想创建一个数据框,以显示最近6个月中每个月的值.这意味着,只要上述数据帧中存在间隙,我就需要复制记录.例如,如果最近6个月是2020-07、2020-08,... 2020-12,则上述数据框的结果应为

I would like to create a dataframe that shows the values for each of the last 6 months. This means that I need to duplicate the records whenever there is a gap in the above dataframe. For example, if the last 6 months are 2020-07, 2020-08, ... 2020-12, then the result for the above dataframe should be

    ----------------------------------------
    |product_id| status    | price| month  |
    ----------------------------------------
    |1         | available | 5    | 2020-07|
    ----------------------------------------
    |1         | available | 8    | 2020-08|
    ----------------------------------------
    |1         | available | 8    | 2020-09|
    ----------------------------------------
    |1         | limited   | 8    | 2020-10|
    ----------------------------------------
    |1         | limited   | 8    | 2020-11|
    ----------------------------------------
    |1         | limited   | 8    | 2020-12|
    ----------------------------------------
    |2         | limited   | 1    | 2020-09|
    ----------------------------------------
    |2         | limited   | 3    | 2020-10|
    ----------------------------------------
    |2         | limited   | 3    | 2020-11|
    ----------------------------------------
    |2         | limited   | 3    | 2020-12|
    ----------------------------------------

请注意,对于product_id = 1,存在从2019-10开始的较旧记录,该记录一直传播到2020-08,然后进行修整,而对于product_id = 2,则没有2020-09之前的记录,因此没有2020-07月份的记录.,则2020-08年尚未填充(因为该产品在2020-09年之前不存在).

Notice that for product_id = 1 there was an older record from 2019-10 that was propagated until 2020-08 and then trimmed, whereas for product_id = 2 there were no records prior to 2020-09 and thus the months 2020-07, 2020-08 were not filled for it (as the product did not exist prior to 2020-09).

由于该数据帧包含数百万条记录,因此蛮力"操作将导致数据丢失.使用for循环并检查每个product_id的解决方案相当慢.似乎应该可以使用窗口函数来解决此问题,方法是创建一个next_month列,然后根据该列填充空白,但我不知道该如何实现.

Since the dataframe consists of millions of records, a "brute-force" solution using for loops and checking for each product_id is rather slow. It seems that it should be possible to solve this using window functions, by creating another column next_month and then filling in the gaps based on that column, but I don't know how to achieve that.

推荐答案

关于 @jxc 注释,我已经为该用例准备了答案.

With Respect to the @jxc comment, I have prepared the answer for this use case.

以下是代码段.

  1. 导入spark SQL函数

  1. Import the spark SQL functions

导入功能为F,Window

准备样品数据

    simpleData = ((1,"Available",5,"2020-07"),                                                              
    (1,"Available",8,"2020-08"),                                           
    (1,"Limited",8,"2020-12"),                                           
    (2,"Limited",1,"2020-09"),                                          
    (2,"Limited",3,"2020-12")
    )

    
    columns= ["product_id", "status", "price", "month"]

  1. 创建示例数据的数据框

  1. Creating dataframe of sample data

df = spark.createDataFrame(数据= simpleData,架构=列)

在数据框中添加日期列以获取正确的格式化日期

Add date column in dataframe to get proper formatted date

    df0 = df.withColumn("date",F.to_date('month','yyyy-MM'))

    df0.show()

    +----------+---------+-----+-------+----------+                                          
    |product_id|   status|price|  month|      date|                                               
    +----------+---------+-----+-------+----------+                                                
    |         1|Available|    5|2020-07|2020-07-01|                                                 
    |         1|Available|    8|2020-08|2020-08-01|                                                
    |         1|  Limited|    8|2020-12|2020-12-01|                                                
    |         2|  Limited|    1|2020-09|2020-09-01|                                                
    |         2|  Limited|    3|2020-12|2020-12-01|                                                
    +----------+---------+-----+-------+----------+

  1. 创建WinSpec w1并使用Window聚合函数查找下一个日期(w1),将其转换为前几个月以设置日期序列:

    w1 = Window.partitionBy('product_id').orderBy('date')
    df1 = df0.withColumn('end_date',F.coalesce(F.add_months(F.lead('date').over(w1),-1),'date'))
    df1.show()

    +----------+---------+-----+-------+----------+----------+                                                                  
    |product_id|   status|price|  month|      date|  end_date|                                                      
    +----------+---------+-----+-------+----------+----------+                                              
    |         1|Available|    5|2020-07|2020-07-01|2020-07-01|                                                      
    |         1|Available|    8|2020-08|2020-08-01|2020-11-01|                                                            
    |         1|  Limited|    8|2020-12|2020-12-01|2020-12-01|                                                                     
    |         2|  Limited|    1|2020-09|2020-09-01|2020-11-01|                                                                            
    |         2|  Limited|    3|2020-12|2020-12-01|2020-12-01|                                                                                   
    +----------+---------+-----+-------+----------+----------+

  1. 使用months_between(end_date,date)计算两个日期之间的月数,并使用转换函数迭代序列(0,#months),创建一个具有date = add_months(date,i)和price = IF的named_struct(i = 0,price,price),请使用inline_outer爆炸结构数组.

    df2 = df1.selectExpr("product_id", "status", inline_outer( transform( sequence(0,int(months_between(end_date, date)),1), i -> (add_months(date,i) as date, IF(i=0,price,price) as price) ) ) )

    df2.show()

    +----------+---------+----------+-----+                                                    
    |product_id|   status|      date|price|                                                             
    +----------+---------+----------+-----+                                                              
    |         1|Available|2020-07-01|    5|                                                              
    |         1|Available|2020-08-01|    8|                                                  
    |         1|Available|2020-09-01|    8|                                                           
    |         1|Available|2020-10-01|    8|                                                             
    |         1|Available|2020-11-01|    8|                                                                 
    |         1|  Limited|2020-12-01|    8|                                                                
    |         2|  Limited|2020-09-01|    1|                                                                                 
    |         2|  Limited|2020-10-01|    1|                                                    
    |         2|  Limited|2020-11-01|    1|                                                                          
    |         2|  Limited|2020-12-01|    3|                                                          
    +----------+---------+----------+-----+                    

  1. product_id 上对数据帧进行分区,并在 df3 中添加等级列,以获取每一行的行号.然后,为每个 product_id 的新列 max_rank 存储 rank 列值的最大值,并将 max_rank 存储到 df4
  1. Partitioning the dataframe on product_id and adding a rank column in df3 to get row number for each row. Then, Storing the maximum of rank column value with new column max_rank for each product_id and storing max_rank in to df4

    w2 = Window.partitionBy('product_id').orderBy('date')                                                            
    df3 = df2.withColumn('rank',F.row_number().over(w2))                                                                 
    Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int]
    df3.show()
    +----------+---------+----------+-----+----+
    |product_id|   status|      date|price|rank|
    +----------+---------+----------+-----+----+
    |         1|Available|2020-07-01|    5|   1|
    |         1|Available|2020-08-01|    8|   2|
    |         1|Available|2020-09-01|    8|   3|
    |         1|Available|2020-10-01|    8|   4|
    |         1|Available|2020-11-01|    8|   5|
    |         1|  Limited|2020-12-01|    8|   6|
    |         2|  Limited|2020-09-01|    1|   1|
    |         2|  Limited|2020-10-01|    1|   2|
    |         2|  Limited|2020-11-01|    1|   3|
    |         2|  Limited|2020-12-01|    3|   4|
    +----------+---------+----------+-----+----+ 

                                                                                                           
    df4 = df3.groupBy("product_id").agg(F.max('rank').alias('max_rank'))                                                           
    Schema: DataFrame[product_id: bigint, max_rank: int]
    df4.show()
    +----------+--------+
    |product_id|max_rank|
    +----------+--------+
    |         1|       6|
    |         2|       4|
    +----------+--------+

  1. product_id 上加入 df3 df4 数据帧得到 max_rank
  1. Joining df3 and df4 dataframes on product_id get max_rank

    df5 = df3.join(df4,df3.product_id == df4.product_id,"inner") \
             .select(df3.product_id,df3.status,df3.date,df3.price,df3.rank,df4.max_rank)                                                                                          
    Schema: DataFrame[product_id: bigint, status: string, date: date, price: bigint, rank: int, max_rank: int]
    df5.show()
    +----------+---------+----------+-----+----+--------+
    |product_id|   status|      date|price|rank|max_rank|
    +----------+---------+----------+-----+----+--------+
    |         1|Available|2020-07-01|    5|   1|       6|
    |         1|Available|2020-08-01|    8|   2|       6|
    |         1|Available|2020-09-01|    8|   3|       6|
    |         1|Available|2020-10-01|    8|   4|       6|
    |         1|Available|2020-11-01|    8|   5|       6|
    |         1|  Limited|2020-12-01|    8|   6|       6|
    |         2|  Limited|2020-09-01|    1|   1|       4|
    |         2|  Limited|2020-10-01|    1|   2|       4|
    |         2|  Limited|2020-11-01|    1|   3|       4|
    |         2|  Limited|2020-12-01|    3|   4|       4|
    +----------+---------+----------+-----+----+--------+

  1. 然后最后使用 ween 函数过滤 df5 数据帧以获取最新的6个月数据.
  1. Then finally filtering the df5 dataframe using between function to get the latest 6 months data.

    FinalResultDF = df5.filter(F.col('rank') \                                      
                         .between(F.when((F.col('max_rank') > 5),(F.col('max_rank')-6)).otherwise(0),F.col('max_rank'))) \
                         .select(df5.product_id,df5.status,df5.date,df5.price)

    FinalResultDF.show(truncate=False)   

    +----------+---------+----------+-----+                                               
    |product_id|status   |date      |price|                                                
    +----------+---------+----------+-----+                                                               
    |1         |Available|2020-07-01|5    |                                                                                
    |1         |Available|2020-08-01|8    |                                                                                          
    |1         |Available|2020-09-01|8    |                                                                                                           
    |1         |Available|2020-10-01|8    |                                                                                                             
    |1         |Available|2020-11-01|8    |                                                                                                               
    |1         |Limited  |2020-12-01|8    |                                                                                                                     
    |2         |Limited  |2020-09-01|1    |                                                                                                                     
    |2         |Limited  |2020-10-01|1    |                                                                                                                        
    |2         |Limited  |2020-11-01|1    |                                                                                                                      
    |2         |Limited  |2020-12-01|3    |                                                                                                         
    +----------+---------+----------+-----+

这篇关于在PySpark数据框中的选定时间间隔内的日期间隔之间复制记录的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆