Pyspark:如何解决复杂的数据框逻辑加联接 [英] Pyspark: how to solve complicated dataframe logic plus join

查看:68
本文介绍了Pyspark:如何解决复杂的数据框逻辑加联接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个要处理的数据框,第一个看起来像下面的 df1

  df1_schema = StructType([StructField("Date"",StringType(),True),\StructField("store_id",StringType(),True),\StructField("warehouse_id",StringType(),True),\StructField("class_id",StringType(),True),\StructField("total_time",IntegerType(),True)])df_data = [('2020-08-01','110','1','11010',3),('2020-08-02','110','1','11010',2),\('2020-08-03','110','1','11010',3),('2020-08-04','110','1','11010',3),\('2020-08-05','111','1','11010',1),(''2020-08-06','111','1','11010',-1)]rdd = sc.parallelize(df_data)df1 = sqlContext.createDataFrame(df_data,df1_schema)df1 = df1.withColumn("Date",to_date("Date",'yyyy-MM-dd'))df1.show()+ ---------- + -------- + ------------ + -------- + --------+|日期|商店ID |仓库ID |类别ID |总时间|+ ---------- + -------- + ------------ + -------- + --------+| 2020-08-01 |110 |1 |11010 |3 || 2020-08-02 |110 |1 |11010 |2 || 2020-08-03 |110 |1 |11010 |3 || 2020-08-04 |110 |1 |11010 |3 || 2020-08-05 |111 |1 |11010 |1 || 2020-08-06 |111 |1 |11010 |-1 |+ ---------- + -------- + ------------ + -------- + --------+ 

我计算出一个叫做到达日期

的东西

 #计算到达日期#logic:添加日期+ total_time,因此在第一行中,2020-08-01 +3会给我2020-08-04#如果total_time为-1,则返回空白df1 = df1.withColumn('arrival_date',F.when(col('total_time')!= -1,expr("date_add(date,total_time)")).否则(''))+ ---------- + -------- + ------------ + -------- + ---------- + ------------ +|日期|商店ID |仓库ID |类别ID |总时间|到达日期|+ ---------- + -------- + ------------ + -------- + ---------- + ------------ +| 2020-08-01 |110 |1 |11010 |3 |2020-08-04 || 2020-08-02 |110 |1 |11010 |2 |2020-08-04 || 2020-08-03 |110 |1 |11010 |3 |2020-08-06 || 2020-08-04 |110 |1 |11010 |3 |2020-08-07 || 2020-08-05 |111 |1 |11010 |1 |2020-08-06 || 2020-08-06 |111 |1 |11010 |-1 ||+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + 

我要计算的是这个.

 #计算运输日期#如果到达日期相同,例如)2020-08-04重复2次或更多次,则取min("Date")#将会是2020-08-01,否则只返回日期,例如2020-08-07只会返回2020-08-04#我们也需要注意cloth_id,我们也将到达日期= 2020-08-06重复了两次,但是#如果store_id或Warehouse_id之一不同,我们将分别对待它们.因此,在到达日期= 2020-08-06,日期= 2020-08-03,##我们必须返回2020-08-03#因此,当(store_id,warehouse_id之一)不同时,我们将分别对待它们.#*注意*我们不在乎class_id,它无效.#if入住日期=空白,然后将其保留为空白.#所以我们的df看起来像这样.+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + ------------ +|日期|商店ID |仓库ID |类别ID |总时间|到达日期|过境日期|+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + ------------ +| 2020-08-01 |110 |1 |11010 |3 |2020-08-04 |2020-08-01 || 2020-08-02 |110 |1 |11010 |2 |2020-08-04 |2020-08-01 || 2020-08-03 |110 |1 |11010 |3 |2020-08-06 |2020-08-03 || 2020-08-04 |110 |1 |11010 |3 |2020-08-07 |2020-08-04 || 2020-08-05 |111 |1 |11010 |1 |2020-08-06 |2020-08-05 || 2020-08-06 |111 |1 |11010 |-1 |||+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + ------------ + 

接下来,我的 df2 如下所示.

 #我们有另一个名为df2的数据框df2_schema = StructType([StructField("Date"",StringType(),True),\StructField("store_id",StringType(),True),\StructField("warehouse_id",StringType(),True),\StructField("cloth_id",StringType(),True),\StructField("class_id",StringType(),True),\StructField("type",StringType(),True),\StructField("quantity",IntegerType(),True)])df_data = [('2020-08-01','110','1','M_1','11010','R',5),('2020-08-01','110','1','M_1','11010','R',2),\('2020-08-02','110','1','M_1','11010','C',3),('2020-08-03','110','1','M_1','11010','R',1),\('2020-08-04','110','1','M_1','11010','R',3),('2020-08-05','111','1','M_2','11010','R',5)]rdd = sc.parallelize(df_data)df2 = sqlContext.createDataFrame(df_data,df2_schema)df2 = df2.withColumn("Date",to_date("Date",'yyyy-MM-dd'))df2.show()+ ---------- + -------- + ------------ + -------- + --------+ ---- + -------- +|日期|商店ID |仓库ID |衣服ID |类别ID |类型|数量|+ ---------- + -------- + ------------ + -------- + --------+ ---- + -------- +| 2020-08-01 |110 |1 |M_1 |11010 |R |5 || 2020-08-01 |110 |1 |M_1 |11010 |R |2 || 2020-08-02 |110 |1 |M_1 |11010 |C |3 || 2020-08-03 |110 |1 |M_1 |11010 |R |1 || 2020-08-04 |110 |1 |M_1 |11010 |R |3 || 2020-08-05 |111 |1 |M_2 |11010 |R |5 |+ ---------- + -------- + ------------ + -------- + --------+ ---- + -------- + 

我计算了 quantity2 ,这只是type = R

的数量之和

  df2 = df2.groupBy('Date','store_id','warehouse_id','cloth_id','class_id')\.agg(F.sum(F.when(col('type')=='R',col('quantity'))\.otherwise(col('quantity'))).alias('quantity2')).orderBy('Date')+ ---------- + -------- + ------------ + -------- + --------+ --------- +|日期|商店ID |仓库ID |衣服ID |类别ID |数量2 |+ ---------- + -------- + ------------ + -------- + --------+ --------- +| 2020-08-01 |110 |1 |M_1 |11010 |7 || 2020-08-02 |110 |1 |M_1 |11010 |3 || 2020-08-03 |110 |1 |M_1 |11010 |1 || 2020-08-04 |110 |1 |M_1 |11010 |3 || 2020-08-05 |111 |1 |M_2 |11010 |5 |+ ---------- + -------- + ------------ + -------- + --------+ --------- + 

现在我有df1和df2.我想加入,这样它将看起来像这样...我尝试过这样的事情

  df4 = df1.select('store_id','warehouse_id','class_id','arrival_date','transit_date')df4 = df4.filter(" transition_date!=``")df4 = df4.withColumnRenamed('arrival_date','date')df3 = df2.join(df1,on = ['Date','store_id','warehouse_id','class_id'],how ='inner').orderBy('Date')df5 = df3.join(df4,on = ['Date','store_id','warehouse_id','class_id'],how ='left').orderBy('Date') 

但是我不认为这是正确的方法....结果df应该如下所示.

  + ---------- + -------- + ------------ + -------- +-------- + --------- + ---------- + ------------ + ------------ +|日期|商店ID |仓库ID |类别ID |衣服ID |数量2 |总时间|到达日期|过境日期|+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +| 2020-08-01 |110 |1 |11010 |M_1 |7 |3 |2020-08-04 |null || 2020-08-02 |110 |1 |11010 |M_1 |3 |2 |2020-08-04 |null || 2020-08-03 |110 |1 |11010 |M_1 |1 |3 |2020-08-06 |null || 2020-08-04 |110 |1 |11010 |M_1 |3 |3 |2020-08-07 |2020-08-01 || 2020-08-05 |111 |1 |11010 |M_2 |5 |1 |2020-08-06 |null |+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ + 

请注意,transit_date到达了 Date = arrival_date 的位置,当然,空值会被替换为空白.

最后,如果今天是2020-08-04,请查看where到达日期== 2020-08-04,并对数量求和并将其放在今天.所以....看起来像这样... store_id = 111,它将有单独的日期.这里没有显示..所以当store_id = 111时逻辑也必须有意义..我刚刚显示了store_id = 110的示​​例

解决方案

根据我对您的问题的理解,以及以下 df1 df2 所在的位置:

  df1.orderBy('Date').show()df2.orderBy('Date').show()+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + + ---------- + -------- + ------------+ -------- + -------- + --------- +|日期|商店ID |仓库ID |类别ID |总时间|到达日期||日期|商店ID |仓库ID |衣服ID |类别ID |数量2 |+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + + ---------- + -------- + ------------+ -------- + -------- + --------- +| 2020-08-01 |110 |1 |11010 |3 |2020-08-04 || 2020-08-01 |110 |1 |M_1 |11010 |7 || 2020-08-02 |110 |1 |11010 |2 |2020-08-04 || 2020-08-02 |110 |1 |M_1 |11010 |3 || 2020-08-03 |110 |1 |11010 |3 |2020-08-06 || 2020-08-03 |110 |1 |M_1 |11010 |1 || 2020-08-04 |110 |1 |11010 |3 |2020-08-07 || 2020-08-04 |110 |1 |M_1 |11010 |3 || 2020-08-05 |111 |1 |11010 |1 |2020-08-06 || 2020-08-05 |111 |1 |M_2 |11010 |5 || 2020-08-06 |111 |1 |11010 |-1 ||+ ---------- + -------- + ------------ + -------- + --------+ --------- ++ ---------- + -------- + ------------ + -------- + ---------- + ------------ + 

您可以尝试以下5个步骤:

步骤1::设置要连接的列名称列表 grp_cols :

来自pyspark.sql的

 导入功能为Fgrp_cols = [日期","store_id","warehouse_id","class_id"] 

第2步::创建包含 transit_date 的df3,该日期是 arrival_date store_id ,仓库编号 class_id :

  df3 = df1.filter('total_time!= -1')\.groupby("arrival_date","store_id","warehouse_id","class_id")\.agg(F.min('Date').alias('transit_date'))\.withColumnRenamed("arrival_date","Date")df3.orderBy('Date').show()+ ---------- + -------- + ------------ + -------- + ------------ +|日期|商店ID |仓库ID |类别ID |运输日期|+ ---------- + -------- + ------------ + -------- + ------------ +| 2020-08-04 |110 |1 |11010 |2020-08-01 || 2020-08-06 |111 |1 |11010 |2020-08-05 || 2020-08-06 |110 |1 |11010 |2020-08-03 || 2020-08-07 |110 |1 |11010 |2020-08-04 |+ ---------- + -------- + ------------ + -------- + ------------ + 

第3步::通过将df2与df1联接来设置df4,然后使用grp_cols左键联接df3,并保持df4

  df4 = df2.join(df1,grp_cols).join(df3,grp_cols,"left")\.withColumn('transit_date',F.when(F.col('total_time')!= -1,F.col("transit_date")).otherwise(''))\.坚持()_ = df4.count()df4.orderBy('Date').show()+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +|日期|商店ID |仓库ID |类别ID |衣服ID |数量2 |总时间|到达日期|过境日期|+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +| 2020-08-01 |110 |1 |11010 |M_1 |7 |3 |2020-08-04 |null || 2020-08-02 |110 |1 |11010 |M_1 |3 |2 |2020-08-04 |null || 2020-08-03 |110 |1 |11010 |M_1 |1 |3 |2020-08-06 |null || 2020-08-04 |110 |1 |11010 |M_1 |3 |3 |2020-08-07 |2020-08-01 || 2020-08-05 |111 |1 |11010 |M_2 |5 |1 |2020-08-06 |null |+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ + 

步骤4 :对于每个到达日期 + store_id +,从df4中根据需要计算 sum(quantity2) warehouse_id + class_id + cloth_id

  df5 = df4 \.groupby("arrival_date","store_id","warehouse_id","class_id","cloth_id")\.agg(F.sum("quantity2").alias("want"))\.withColumnRenamed("arrival_date","Date")df5.orderBy('Date').show()+ ---------- + -------- + ------------ + -------- + --------+ ---- +|日期|商店ID |仓库ID |类别ID |衣服ID |想|+ ---------- + -------- + ------------ + -------- + --------+ ---- +| 2020-08-04 |110 |1 |11010 |M_1 |10 || 2020-08-06 |111 |1 |11010 |M_2 |5 || 2020-08-06 |110 |1 |11010 |M_1 |1 || 2020-08-07 |110 |1 |11010 |M_1 |3 |+ ---------- + -------- + ------------ + -------- + --------+ ---- + 

第5步::通过将df4与df5左连接来创建最终数据框

  df_new = df4.join(df5,grp_cols + ["cloth_id"],"left").fillna(0,子集= ['want'])df_new.orderBy(日期").show()+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +---- +|日期|商店ID |仓库ID |类别ID |衣服ID |数量2 |总时间|到达日期|过境日期|希望|+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +--+| 2020-08-01 |110 |1 |11010 |M_1 |7 |3 |2020-08-04 |null |0 || 2020-08-02 |110 |1 |11010 |M_1 |3 |2 |2020-08-04 |null |0 || 2020-08-03 |110 |1 |11010 |M_1 |1 |3 |2020-08-06 |null |0 || 2020-08-04 |110 |1 |11010 |M_1 |3 |3 |2020-08-07 |2020-08-01 |10 || 2020-08-05 |111 |1 |11010 |M_2 |5 |1 |2020-08-06 |null |0 |+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +--+df4.unpersist() 

I have two data frames to work on, the first one looks like this the following df1

df1_schema = StructType([StructField("Date", StringType(), True),\
                              StructField("store_id", StringType(), True),\
                             StructField("warehouse_id", StringType(), True),\
                      StructField("class_id", StringType(), True) ,\
                       StructField("total_time", IntegerType(), True) ])
df_data = [('2020-08-01','110','1','11010',3),('2020-08-02','110','1','11010',2),\
           ('2020-08-03','110','1','11010',3),('2020-08-04','110','1','11010',3),\
            ('2020-08-05','111','1','11010',1),('2020-08-06','111','1','11010',-1)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()

+----------+--------+------------+--------+----------+
|      Date|store_id|warehouse_id|class_id|total_time|
+----------+--------+------------+--------+----------+
|2020-08-01|     110|           1|   11010|         3|
|2020-08-02|     110|           1|   11010|         2|
|2020-08-03|     110|           1|   11010|         3|
|2020-08-04|     110|           1|   11010|         3|
|2020-08-05|     111|           1|   11010|         1|
|2020-08-06|     111|           1|   11010|        -1|
+----------+--------+------------+--------+----------+

I calculated something called arrival_date

#To calculate the arrival_date
#logic : add the Date + total_time so in first row, 2020-08-01 +3 would give me 2020-08-04 
#if total_time is -1 then return blank
df1= df1.withColumn('arrival_date', F.when(col('total_time') != -1, expr("date_add(date, total_time)"))
        .otherwise(''))
+----------+--------+------------+--------+----------+------------+
|      Date|store_id|warehouse_id|class_id|total_time|arrival_date|
+----------+--------+------------+--------+----------+------------+
|2020-08-01|     110|           1|   11010|         3|  2020-08-04|
|2020-08-02|     110|           1|   11010|         2|  2020-08-04|
|2020-08-03|     110|           1|   11010|         3|  2020-08-06|
|2020-08-04|     110|           1|   11010|         3|  2020-08-07|
|2020-08-05|     111|           1|   11010|         1|  2020-08-06|
|2020-08-06|     111|           1|   11010|        -1|            |
+----------+--------+------------+--------+----------+------------+

and what I want to calculate is this..

#to calculate the transit_date
#if arrival_date is same, ex) 2020-08-04 is repeated 2 or more times, then take min("Date") 
#which will be 2020-08-01 otherwise just return the Date ex) 2020-08-07 would just return 2020-08-04
#we need to care about cloth_id too, we have arrival_date = 2020-08-06 repeated 2 times as well but since
#if one of store_id or warehouse_id is different we treat them separately. so at arrival_date = 2020-08-06 at date = 2020-08-03,
##we must return 2020-08-03 
#so we treat them separately when one of (store_id, warehouse_id ) is different. 
#*Note* we dont care about class_id, its not effective.
#if arrival_date = blank then leave it as blank..
#so our df would look something like this.
+----------+--------+------------+--------+----------+------------+------------+
|      Date|store_id|warehouse_id|class_id|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+----------+------------+------------+
|2020-08-01|     110|           1|   11010|         3|  2020-08-04|  2020-08-01|
|2020-08-02|     110|           1|   11010|         2|  2020-08-04|  2020-08-01|
|2020-08-03|     110|           1|   11010|         3|  2020-08-06|  2020-08-03|
|2020-08-04|     110|           1|   11010|         3|  2020-08-07|  2020-08-04|
|2020-08-05|     111|           1|   11010|         1|  2020-08-06|  2020-08-05|
|2020-08-06|     111|           1|   11010|        -1|            |            |
+----------+--------+------------+--------+----------+------------+------------+

Next, I have df2 looks like the following..

#we have another dataframe call it df2

df2_schema = StructType([StructField("Date", StringType(), True),\
                              StructField("store_id", StringType(), True),\
                             StructField("warehouse_id", StringType(), True),\
                             StructField("cloth_id", StringType(), True),\
                      StructField("class_id", StringType(), True) ,\
                       StructField("type", StringType(), True),\
                        StructField("quantity", IntegerType(), True)])
df_data = [('2020-08-01','110','1','M_1','11010','R',5),('2020-08-01','110','1','M_1','11010','R',2),\
           ('2020-08-02','110','1','M_1','11010','C',3),('2020-08-03','110','1','M_1','11010','R',1),\
            ('2020-08-04','110','1','M_1','11010','R',3),('2020-08-05','111','1','M_2','11010','R',5)]
rdd = sc.parallelize(df_data)
df2 = sqlContext.createDataFrame(df_data, df2_schema)
df2 = df2.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df2.show()

+----------+--------+------------+--------+--------+----+--------+
|      Date|store_id|warehouse_id|cloth_id|class_id|type|quantity|
+----------+--------+------------+--------+--------+----+--------+
|2020-08-01|     110|           1|     M_1|   11010|   R|       5|
|2020-08-01|     110|           1|     M_1|   11010|   R|       2|
|2020-08-02|     110|           1|     M_1|   11010|   C|       3|
|2020-08-03|     110|           1|     M_1|   11010|   R|       1|
|2020-08-04|     110|           1|     M_1|   11010|   R|       3|
|2020-08-05|     111|           1|     M_2|   11010|   R|       5|
+----------+--------+------------+--------+--------+----+--------+

and I calculated quantity2, this is just sum of quantity where type=R

df2 =df2.groupBy('Date','store_id','warehouse_id','cloth_id','class_id')\
      .agg( F.sum(F.when(col('type')=='R', col('quantity'))\
      .otherwise(col('quantity'))).alias('quantity2')).orderBy('Date')
+----------+--------+------------+--------+--------+---------+
|      Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+--------+---------+
|2020-08-01|     110|           1|     M_1|   11010|        7|
|2020-08-02|     110|           1|     M_1|   11010|        3|
|2020-08-03|     110|           1|     M_1|   11010|        1|
|2020-08-04|     110|           1|     M_1|   11010|        3|
|2020-08-05|     111|           1|     M_2|   11010|        5|
+----------+--------+------------+--------+--------+---------+

Now I have df1, and df2. I want to join such that It will look something like this... I tried something like this

df4 = df1.select('store_id','warehouse_id','class_id','arrival_date','transit_date')
df4= df4.filter(" transit_date != '' ")

df4=df4.withColumnRenamed('arrival_date', 'date')

df3 = df2.join(df1, on=['Date','store_id','warehouse_id','class_id'],how='inner').orderBy('Date')
df5 = df3.join(df4, on=['Date','store_id','warehouse_id','class_id'], how='left').orderBy('Date')

but I dont think this is the correct approach.... the result df should look like below..

+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|      Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|2020-08-01|     110|           1|   11010|     M_1|        7|         3|  2020-08-04|        null|
|2020-08-02|     110|           1|   11010|     M_1|        3|         2|  2020-08-04|        null|
|2020-08-03|     110|           1|   11010|     M_1|        1|         3|  2020-08-06|        null|
|2020-08-04|     110|           1|   11010|     M_1|        3|         3|  2020-08-07|  2020-08-01|
|2020-08-05|     111|           1|   11010|     M_2|        5|         1|  2020-08-06|        null|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+

note that the transit_date went to where Date = arrival_date of course the null is replaced by blank.

LASTLY, if today is 2020-08-04, then look at where arrival_date == 2020-08-04 and sum up the quantity and place it at today. so.... It will look like this... where the store_id = 111, it will have separate date. not shown here.. so logic needs to make sense when store_id = 111 as well.. i've just shown the example where store_id = 110

解决方案

From my understanding about your question and where you already have with the following df1 and df2:

df1.orderBy('Date').show()                                           df2.orderBy('Date').show()
+----------+--------+------------+--------+----------+------------+  +----------+--------+------------+--------+--------+---------+
|      Date|store_id|warehouse_id|class_id|total_time|arrival_date|  |      Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+----------+------------+  +----------+--------+------------+--------+--------+---------+
|2020-08-01|     110|           1|   11010|         3|  2020-08-04|  |2020-08-01|     110|           1|     M_1|   11010|        7|
|2020-08-02|     110|           1|   11010|         2|  2020-08-04|  |2020-08-02|     110|           1|     M_1|   11010|        3|
|2020-08-03|     110|           1|   11010|         3|  2020-08-06|  |2020-08-03|     110|           1|     M_1|   11010|        1|
|2020-08-04|     110|           1|   11010|         3|  2020-08-07|  |2020-08-04|     110|           1|     M_1|   11010|        3|
|2020-08-05|     111|           1|   11010|         1|  2020-08-06|  |2020-08-05|     111|           1|     M_2|   11010|        5|
|2020-08-06|     111|           1|   11010|        -1|            |  +----------+--------+------------+--------+--------+---------+
+----------+--------+------------+--------+----------+------------+

you can try the following 5 steps:

Step-1: Set up the list of column names grp_cols for join:

from pyspark.sql import functions as F
grp_cols = ["Date", "store_id", "warehouse_id", "class_id"]

Step-2: create df3 containing transit_date which is the min Date on each combination of arrival_date, store_id, warehouse_id and class_id:

df3 = df1.filter('total_time != -1') \
    .groupby("arrival_date", "store_id", "warehouse_id", "class_id") \
    .agg(F.min('Date').alias('transit_date')) \
    .withColumnRenamed("arrival_date", "Date")

df3.orderBy('Date').show()
+----------+--------+------------+--------+------------+
|      Date|store_id|warehouse_id|class_id|transit_date|
+----------+--------+------------+--------+------------+
|2020-08-04|     110|           1|   11010|  2020-08-01|
|2020-08-06|     111|           1|   11010|  2020-08-05|
|2020-08-06|     110|           1|   11010|  2020-08-03|
|2020-08-07|     110|           1|   11010|  2020-08-04|
+----------+--------+------------+--------+------------+

Step-3: set up df4 by join df2 with df1 and left join df3 using grp_cols, persist df4

df4 = df2.join(df1, grp_cols).join(df3, grp_cols, "left") \
    .withColumn('transit_date', F.when(F.col('total_time') != -1, F.col("transit_date")).otherwise('')) \
    .persist()
_ = df4.count()
df4.orderBy('Date').show()
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|      Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|2020-08-01|     110|           1|   11010|     M_1|        7|         3|  2020-08-04|        null|
|2020-08-02|     110|           1|   11010|     M_1|        3|         2|  2020-08-04|        null|
|2020-08-03|     110|           1|   11010|     M_1|        1|         3|  2020-08-06|        null|
|2020-08-04|     110|           1|   11010|     M_1|        3|         3|  2020-08-07|  2020-08-01|
|2020-08-05|     111|           1|   11010|     M_2|        5|         1|  2020-08-06|        null|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+

Step-4: calculate sum(quantity2) as want from df4 for each arrival_date + store_id + warehouse_id + class_id + cloth_id

df5 = df4 \
    .groupby("arrival_date", "store_id", "warehouse_id", "class_id", "cloth_id") \
    .agg(F.sum("quantity2").alias("want")) \
    .withColumnRenamed("arrival_date", "Date")
df5.orderBy('Date').show()
+----------+--------+------------+--------+--------+----+
|      Date|store_id|warehouse_id|class_id|cloth_id|want|
+----------+--------+------------+--------+--------+----+
|2020-08-04|     110|           1|   11010|     M_1|  10|
|2020-08-06|     111|           1|   11010|     M_2|   5|
|2020-08-06|     110|           1|   11010|     M_1|   1|
|2020-08-07|     110|           1|   11010|     M_1|   3|
+----------+--------+------------+--------+--------+----+

Step-5: create the final dataframe by left join df4 with df5

df_new = df4.join(df5, grp_cols+["cloth_id"], "left").fillna(0, subset=['want'])
df_new.orderBy("Date").show()
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
|      Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|want|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
|2020-08-01|     110|           1|   11010|     M_1|        7|         3|  2020-08-04|        null|   0|
|2020-08-02|     110|           1|   11010|     M_1|        3|         2|  2020-08-04|        null|   0|
|2020-08-03|     110|           1|   11010|     M_1|        1|         3|  2020-08-06|        null|   0|
|2020-08-04|     110|           1|   11010|     M_1|        3|         3|  2020-08-07|  2020-08-01|  10|
|2020-08-05|     111|           1|   11010|     M_2|        5|         1|  2020-08-06|        null|   0|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
df4.unpersist()

这篇关于Pyspark:如何解决复杂的数据框逻辑加联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆