Pyspark:如何解决复杂的数据框逻辑加联接 [英] Pyspark: how to solve complicated dataframe logic plus join
问题描述
我有两个要处理的数据框,第一个看起来像下面的 df1
df1_schema = StructType([StructField("Date"",StringType(),True),\StructField("store_id",StringType(),True),\StructField("warehouse_id",StringType(),True),\StructField("class_id",StringType(),True),\StructField("total_time",IntegerType(),True)])df_data = [('2020-08-01','110','1','11010',3),('2020-08-02','110','1','11010',2),\('2020-08-03','110','1','11010',3),('2020-08-04','110','1','11010',3),\('2020-08-05','111','1','11010',1),(''2020-08-06','111','1','11010',-1)]rdd = sc.parallelize(df_data)df1 = sqlContext.createDataFrame(df_data,df1_schema)df1 = df1.withColumn("Date",to_date("Date",'yyyy-MM-dd'))df1.show()+ ---------- + -------- + ------------ + -------- + --------+|日期|商店ID |仓库ID |类别ID |总时间|+ ---------- + -------- + ------------ + -------- + --------+| 2020-08-01 |110 |1 |11010 |3 || 2020-08-02 |110 |1 |11010 |2 || 2020-08-03 |110 |1 |11010 |3 || 2020-08-04 |110 |1 |11010 |3 || 2020-08-05 |111 |1 |11010 |1 || 2020-08-06 |111 |1 |11010 |-1 |+ ---------- + -------- + ------------ + -------- + --------+
我计算出一个叫做到达日期
的东西 #计算到达日期#logic:添加日期+ total_time,因此在第一行中,2020-08-01 +3会给我2020-08-04#如果total_time为-1,则返回空白df1 = df1.withColumn('arrival_date',F.when(col('total_time')!= -1,expr("date_add(date,total_time)")).否则(''))+ ---------- + -------- + ------------ + -------- + ---------- + ------------ +|日期|商店ID |仓库ID |类别ID |总时间|到达日期|+ ---------- + -------- + ------------ + -------- + ---------- + ------------ +| 2020-08-01 |110 |1 |11010 |3 |2020-08-04 || 2020-08-02 |110 |1 |11010 |2 |2020-08-04 || 2020-08-03 |110 |1 |11010 |3 |2020-08-06 || 2020-08-04 |110 |1 |11010 |3 |2020-08-07 || 2020-08-05 |111 |1 |11010 |1 |2020-08-06 || 2020-08-06 |111 |1 |11010 |-1 ||+ ---------- + -------- + ------------ + -------- + ---------- + ------------ +
我要计算的是这个.
#计算运输日期#如果到达日期相同,例如)2020-08-04重复2次或更多次,则取min("Date")#将会是2020-08-01,否则只返回日期,例如2020-08-07只会返回2020-08-04#我们也需要注意cloth_id,我们也将到达日期= 2020-08-06重复了两次,但是#如果store_id或Warehouse_id之一不同,我们将分别对待它们.因此,在到达日期= 2020-08-06,日期= 2020-08-03,##我们必须返回2020-08-03#因此,当(store_id,warehouse_id之一)不同时,我们将分别对待它们.#*注意*我们不在乎class_id,它无效.#if入住日期=空白,然后将其保留为空白.#所以我们的df看起来像这样.+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + ------------ +|日期|商店ID |仓库ID |类别ID |总时间|到达日期|过境日期|+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + ------------ +| 2020-08-01 |110 |1 |11010 |3 |2020-08-04 |2020-08-01 || 2020-08-02 |110 |1 |11010 |2 |2020-08-04 |2020-08-01 || 2020-08-03 |110 |1 |11010 |3 |2020-08-06 |2020-08-03 || 2020-08-04 |110 |1 |11010 |3 |2020-08-07 |2020-08-04 || 2020-08-05 |111 |1 |11010 |1 |2020-08-06 |2020-08-05 || 2020-08-06 |111 |1 |11010 |-1 |||+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + ------------ +
接下来,我的 df2 如下所示.
#我们有另一个名为df2的数据框df2_schema = StructType([StructField("Date"",StringType(),True),\StructField("store_id",StringType(),True),\StructField("warehouse_id",StringType(),True),\StructField("cloth_id",StringType(),True),\StructField("class_id",StringType(),True),\StructField("type",StringType(),True),\StructField("quantity",IntegerType(),True)])df_data = [('2020-08-01','110','1','M_1','11010','R',5),('2020-08-01','110','1','M_1','11010','R',2),\('2020-08-02','110','1','M_1','11010','C',3),('2020-08-03','110','1','M_1','11010','R',1),\('2020-08-04','110','1','M_1','11010','R',3),('2020-08-05','111','1','M_2','11010','R',5)]rdd = sc.parallelize(df_data)df2 = sqlContext.createDataFrame(df_data,df2_schema)df2 = df2.withColumn("Date",to_date("Date",'yyyy-MM-dd'))df2.show()+ ---------- + -------- + ------------ + -------- + --------+ ---- + -------- +|日期|商店ID |仓库ID |衣服ID |类别ID |类型|数量|+ ---------- + -------- + ------------ + -------- + --------+ ---- + -------- +| 2020-08-01 |110 |1 |M_1 |11010 |R |5 || 2020-08-01 |110 |1 |M_1 |11010 |R |2 || 2020-08-02 |110 |1 |M_1 |11010 |C |3 || 2020-08-03 |110 |1 |M_1 |11010 |R |1 || 2020-08-04 |110 |1 |M_1 |11010 |R |3 || 2020-08-05 |111 |1 |M_2 |11010 |R |5 |+ ---------- + -------- + ------------ + -------- + --------+ ---- + -------- +
我计算了 quantity2 ,这只是type = R
的数量之和 df2 = df2.groupBy('Date','store_id','warehouse_id','cloth_id','class_id')\.agg(F.sum(F.when(col('type')=='R',col('quantity'))\.otherwise(col('quantity'))).alias('quantity2')).orderBy('Date')+ ---------- + -------- + ------------ + -------- + --------+ --------- +|日期|商店ID |仓库ID |衣服ID |类别ID |数量2 |+ ---------- + -------- + ------------ + -------- + --------+ --------- +| 2020-08-01 |110 |1 |M_1 |11010 |7 || 2020-08-02 |110 |1 |M_1 |11010 |3 || 2020-08-03 |110 |1 |M_1 |11010 |1 || 2020-08-04 |110 |1 |M_1 |11010 |3 || 2020-08-05 |111 |1 |M_2 |11010 |5 |+ ---------- + -------- + ------------ + -------- + --------+ --------- +
现在我有df1和df2.我想加入,这样它将看起来像这样...我尝试过这样的事情
df4 = df1.select('store_id','warehouse_id','class_id','arrival_date','transit_date')df4 = df4.filter(" transition_date!=``")df4 = df4.withColumnRenamed('arrival_date','date')df3 = df2.join(df1,on = ['Date','store_id','warehouse_id','class_id'],how ='inner').orderBy('Date')df5 = df3.join(df4,on = ['Date','store_id','warehouse_id','class_id'],how ='left').orderBy('Date')
但是我不认为这是正确的方法....结果df应该如下所示.
+ ---------- + -------- + ------------ + -------- +-------- + --------- + ---------- + ------------ + ------------ +|日期|商店ID |仓库ID |类别ID |衣服ID |数量2 |总时间|到达日期|过境日期|+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +| 2020-08-01 |110 |1 |11010 |M_1 |7 |3 |2020-08-04 |null || 2020-08-02 |110 |1 |11010 |M_1 |3 |2 |2020-08-04 |null || 2020-08-03 |110 |1 |11010 |M_1 |1 |3 |2020-08-06 |null || 2020-08-04 |110 |1 |11010 |M_1 |3 |3 |2020-08-07 |2020-08-01 || 2020-08-05 |111 |1 |11010 |M_2 |5 |1 |2020-08-06 |null |+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +
请注意,transit_date到达了 Date = arrival_date
的位置,当然,空值会被替换为空白.
最后,如果今天是2020-08-04,请查看where到达日期== 2020-08-04,并对数量求和并将其放在今天.所以....看起来像这样... store_id = 111,它将有单独的日期.这里没有显示..所以当store_id = 111时逻辑也必须有意义..我刚刚显示了store_id = 110的示例
根据我对您的问题的理解,以及以下 df1
和 df2
所在的位置:
df1.orderBy('Date').show()df2.orderBy('Date').show()+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + + ---------- + -------- + ------------+ -------- + -------- + --------- +|日期|商店ID |仓库ID |类别ID |总时间|到达日期||日期|商店ID |仓库ID |衣服ID |类别ID |数量2 |+ ---------- + -------- + ------------ + -------- + ---------- + ------------ + + ---------- + -------- + ------------+ -------- + -------- + --------- +| 2020-08-01 |110 |1 |11010 |3 |2020-08-04 || 2020-08-01 |110 |1 |M_1 |11010 |7 || 2020-08-02 |110 |1 |11010 |2 |2020-08-04 || 2020-08-02 |110 |1 |M_1 |11010 |3 || 2020-08-03 |110 |1 |11010 |3 |2020-08-06 || 2020-08-03 |110 |1 |M_1 |11010 |1 || 2020-08-04 |110 |1 |11010 |3 |2020-08-07 || 2020-08-04 |110 |1 |M_1 |11010 |3 || 2020-08-05 |111 |1 |11010 |1 |2020-08-06 || 2020-08-05 |111 |1 |M_2 |11010 |5 || 2020-08-06 |111 |1 |11010 |-1 ||+ ---------- + -------- + ------------ + -------- + --------+ --------- ++ ---------- + -------- + ------------ + -------- + ---------- + ------------ +
您可以尝试以下5个步骤:
步骤1::设置要连接的列名称列表 grp_cols
:
导入功能为Fgrp_cols = [日期","store_id","warehouse_id","class_id"]
第2步::创建包含 transit_date
的df3,该日期是 arrival_date
, store_id
,仓库编号
和 class_id
:
df3 = df1.filter('total_time!= -1')\.groupby("arrival_date","store_id","warehouse_id","class_id")\.agg(F.min('Date').alias('transit_date'))\.withColumnRenamed("arrival_date","Date")df3.orderBy('Date').show()+ ---------- + -------- + ------------ + -------- + ------------ +|日期|商店ID |仓库ID |类别ID |运输日期|+ ---------- + -------- + ------------ + -------- + ------------ +| 2020-08-04 |110 |1 |11010 |2020-08-01 || 2020-08-06 |111 |1 |11010 |2020-08-05 || 2020-08-06 |110 |1 |11010 |2020-08-03 || 2020-08-07 |110 |1 |11010 |2020-08-04 |+ ---------- + -------- + ------------ + -------- + ------------ +
第3步::通过将df2与df1联接来设置df4,然后使用grp_cols左键联接df3,并保持df4
df4 = df2.join(df1,grp_cols).join(df3,grp_cols,"left")\.withColumn('transit_date',F.when(F.col('total_time')!= -1,F.col("transit_date")).otherwise(''))\.坚持()_ = df4.count()df4.orderBy('Date').show()+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +|日期|商店ID |仓库ID |类别ID |衣服ID |数量2 |总时间|到达日期|过境日期|+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +| 2020-08-01 |110 |1 |11010 |M_1 |7 |3 |2020-08-04 |null || 2020-08-02 |110 |1 |11010 |M_1 |3 |2 |2020-08-04 |null || 2020-08-03 |110 |1 |11010 |M_1 |1 |3 |2020-08-06 |null || 2020-08-04 |110 |1 |11010 |M_1 |3 |3 |2020-08-07 |2020-08-01 || 2020-08-05 |111 |1 |11010 |M_2 |5 |1 |2020-08-06 |null |+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +
步骤4 :对于每个到达日期
+ store_id
+,从df4中根据需要计算 sum(quantity2)
warehouse_id
+ class_id
+ cloth_id
df5 = df4 \.groupby("arrival_date","store_id","warehouse_id","class_id","cloth_id")\.agg(F.sum("quantity2").alias("want"))\.withColumnRenamed("arrival_date","Date")df5.orderBy('Date').show()+ ---------- + -------- + ------------ + -------- + --------+ ---- +|日期|商店ID |仓库ID |类别ID |衣服ID |想|+ ---------- + -------- + ------------ + -------- + --------+ ---- +| 2020-08-04 |110 |1 |11010 |M_1 |10 || 2020-08-06 |111 |1 |11010 |M_2 |5 || 2020-08-06 |110 |1 |11010 |M_1 |1 || 2020-08-07 |110 |1 |11010 |M_1 |3 |+ ---------- + -------- + ------------ + -------- + --------+ ---- +
第5步::通过将df4与df5左连接来创建最终数据框
df_new = df4.join(df5,grp_cols + ["cloth_id"],"left").fillna(0,子集= ['want'])df_new.orderBy(日期").show()+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +---- +|日期|商店ID |仓库ID |类别ID |衣服ID |数量2 |总时间|到达日期|过境日期|希望|+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +--+| 2020-08-01 |110 |1 |11010 |M_1 |7 |3 |2020-08-04 |null |0 || 2020-08-02 |110 |1 |11010 |M_1 |3 |2 |2020-08-04 |null |0 || 2020-08-03 |110 |1 |11010 |M_1 |1 |3 |2020-08-06 |null |0 || 2020-08-04 |110 |1 |11010 |M_1 |3 |3 |2020-08-07 |2020-08-01 |10 || 2020-08-05 |111 |1 |11010 |M_2 |5 |1 |2020-08-06 |null |0 |+ ---------- + -------- + ------------ + -------- + --------+ --------- + ---------- + ------------ + ------------ +--+df4.unpersist()
I have two data frames to work on, the first one looks like this the following df1
df1_schema = StructType([StructField("Date", StringType(), True),\
StructField("store_id", StringType(), True),\
StructField("warehouse_id", StringType(), True),\
StructField("class_id", StringType(), True) ,\
StructField("total_time", IntegerType(), True) ])
df_data = [('2020-08-01','110','1','11010',3),('2020-08-02','110','1','11010',2),\
('2020-08-03','110','1','11010',3),('2020-08-04','110','1','11010',3),\
('2020-08-05','111','1','11010',1),('2020-08-06','111','1','11010',-1)]
rdd = sc.parallelize(df_data)
df1 = sqlContext.createDataFrame(df_data, df1_schema)
df1 = df1.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df1.show()
+----------+--------+------------+--------+----------+
| Date|store_id|warehouse_id|class_id|total_time|
+----------+--------+------------+--------+----------+
|2020-08-01| 110| 1| 11010| 3|
|2020-08-02| 110| 1| 11010| 2|
|2020-08-03| 110| 1| 11010| 3|
|2020-08-04| 110| 1| 11010| 3|
|2020-08-05| 111| 1| 11010| 1|
|2020-08-06| 111| 1| 11010| -1|
+----------+--------+------------+--------+----------+
I calculated something called arrival_date
#To calculate the arrival_date
#logic : add the Date + total_time so in first row, 2020-08-01 +3 would give me 2020-08-04
#if total_time is -1 then return blank
df1= df1.withColumn('arrival_date', F.when(col('total_time') != -1, expr("date_add(date, total_time)"))
.otherwise(''))
+----------+--------+------------+--------+----------+------------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date|
+----------+--------+------------+--------+----------+------------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06|
|2020-08-06| 111| 1| 11010| -1| |
+----------+--------+------------+--------+----------+------------+
and what I want to calculate is this..
#to calculate the transit_date
#if arrival_date is same, ex) 2020-08-04 is repeated 2 or more times, then take min("Date")
#which will be 2020-08-01 otherwise just return the Date ex) 2020-08-07 would just return 2020-08-04
#we need to care about cloth_id too, we have arrival_date = 2020-08-06 repeated 2 times as well but since
#if one of store_id or warehouse_id is different we treat them separately. so at arrival_date = 2020-08-06 at date = 2020-08-03,
##we must return 2020-08-03
#so we treat them separately when one of (store_id, warehouse_id ) is different.
#*Note* we dont care about class_id, its not effective.
#if arrival_date = blank then leave it as blank..
#so our df would look something like this.
+----------+--------+------------+--------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04| 2020-08-01|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04| 2020-08-01|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06| 2020-08-03|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07| 2020-08-04|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06| 2020-08-05|
|2020-08-06| 111| 1| 11010| -1| | |
+----------+--------+------------+--------+----------+------------+------------+
Next, I have df2 looks like the following..
#we have another dataframe call it df2
df2_schema = StructType([StructField("Date", StringType(), True),\
StructField("store_id", StringType(), True),\
StructField("warehouse_id", StringType(), True),\
StructField("cloth_id", StringType(), True),\
StructField("class_id", StringType(), True) ,\
StructField("type", StringType(), True),\
StructField("quantity", IntegerType(), True)])
df_data = [('2020-08-01','110','1','M_1','11010','R',5),('2020-08-01','110','1','M_1','11010','R',2),\
('2020-08-02','110','1','M_1','11010','C',3),('2020-08-03','110','1','M_1','11010','R',1),\
('2020-08-04','110','1','M_1','11010','R',3),('2020-08-05','111','1','M_2','11010','R',5)]
rdd = sc.parallelize(df_data)
df2 = sqlContext.createDataFrame(df_data, df2_schema)
df2 = df2.withColumn("Date",to_date("Date", 'yyyy-MM-dd'))
df2.show()
+----------+--------+------------+--------+--------+----+--------+
| Date|store_id|warehouse_id|cloth_id|class_id|type|quantity|
+----------+--------+------------+--------+--------+----+--------+
|2020-08-01| 110| 1| M_1| 11010| R| 5|
|2020-08-01| 110| 1| M_1| 11010| R| 2|
|2020-08-02| 110| 1| M_1| 11010| C| 3|
|2020-08-03| 110| 1| M_1| 11010| R| 1|
|2020-08-04| 110| 1| M_1| 11010| R| 3|
|2020-08-05| 111| 1| M_2| 11010| R| 5|
+----------+--------+------------+--------+--------+----+--------+
and I calculated quantity2, this is just sum of quantity where type=R
df2 =df2.groupBy('Date','store_id','warehouse_id','cloth_id','class_id')\
.agg( F.sum(F.when(col('type')=='R', col('quantity'))\
.otherwise(col('quantity'))).alias('quantity2')).orderBy('Date')
+----------+--------+------------+--------+--------+---------+
| Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+--------+---------+
|2020-08-01| 110| 1| M_1| 11010| 7|
|2020-08-02| 110| 1| M_1| 11010| 3|
|2020-08-03| 110| 1| M_1| 11010| 1|
|2020-08-04| 110| 1| M_1| 11010| 3|
|2020-08-05| 111| 1| M_2| 11010| 5|
+----------+--------+------------+--------+--------+---------+
Now I have df1, and df2. I want to join such that It will look something like this... I tried something like this
df4 = df1.select('store_id','warehouse_id','class_id','arrival_date','transit_date')
df4= df4.filter(" transit_date != '' ")
df4=df4.withColumnRenamed('arrival_date', 'date')
df3 = df2.join(df1, on=['Date','store_id','warehouse_id','class_id'],how='inner').orderBy('Date')
df5 = df3.join(df4, on=['Date','store_id','warehouse_id','class_id'], how='left').orderBy('Date')
but I dont think this is the correct approach.... the result df should look like below..
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
note that the transit_date went to where Date = arrival_date
of course the null is replaced by blank.
LASTLY, if today is 2020-08-04, then look at where arrival_date == 2020-08-04 and sum up the quantity and place it at today. so.... It will look like this... where the store_id = 111, it will have separate date. not shown here.. so logic needs to make sense when store_id = 111 as well.. i've just shown the example where store_id = 110
From my understanding about your question and where you already have with the following df1
and df2
:
df1.orderBy('Date').show() df2.orderBy('Date').show()
+----------+--------+------------+--------+----------+------------+ +----------+--------+------------+--------+--------+---------+
| Date|store_id|warehouse_id|class_id|total_time|arrival_date| | Date|store_id|warehouse_id|cloth_id|class_id|quantity2|
+----------+--------+------------+--------+----------+------------+ +----------+--------+------------+--------+--------+---------+
|2020-08-01| 110| 1| 11010| 3| 2020-08-04| |2020-08-01| 110| 1| M_1| 11010| 7|
|2020-08-02| 110| 1| 11010| 2| 2020-08-04| |2020-08-02| 110| 1| M_1| 11010| 3|
|2020-08-03| 110| 1| 11010| 3| 2020-08-06| |2020-08-03| 110| 1| M_1| 11010| 1|
|2020-08-04| 110| 1| 11010| 3| 2020-08-07| |2020-08-04| 110| 1| M_1| 11010| 3|
|2020-08-05| 111| 1| 11010| 1| 2020-08-06| |2020-08-05| 111| 1| M_2| 11010| 5|
|2020-08-06| 111| 1| 11010| -1| | +----------+--------+------------+--------+--------+---------+
+----------+--------+------------+--------+----------+------------+
you can try the following 5 steps:
Step-1: Set up the list of column names grp_cols
for join:
from pyspark.sql import functions as F
grp_cols = ["Date", "store_id", "warehouse_id", "class_id"]
Step-2: create df3 containing transit_date
which is the min Date on each combination of arrival_date
, store_id
, warehouse_id
and class_id
:
df3 = df1.filter('total_time != -1') \
.groupby("arrival_date", "store_id", "warehouse_id", "class_id") \
.agg(F.min('Date').alias('transit_date')) \
.withColumnRenamed("arrival_date", "Date")
df3.orderBy('Date').show()
+----------+--------+------------+--------+------------+
| Date|store_id|warehouse_id|class_id|transit_date|
+----------+--------+------------+--------+------------+
|2020-08-04| 110| 1| 11010| 2020-08-01|
|2020-08-06| 111| 1| 11010| 2020-08-05|
|2020-08-06| 110| 1| 11010| 2020-08-03|
|2020-08-07| 110| 1| 11010| 2020-08-04|
+----------+--------+------------+--------+------------+
Step-3: set up df4 by join df2 with df1 and left join df3 using grp_cols, persist df4
df4 = df2.join(df1, grp_cols).join(df3, grp_cols, "left") \
.withColumn('transit_date', F.when(F.col('total_time') != -1, F.col("transit_date")).otherwise('')) \
.persist()
_ = df4.count()
df4.orderBy('Date').show()
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+
Step-4: calculate sum(quantity2) as want
from df4 for each arrival_date
+ store_id
+ warehouse_id
+ class_id
+ cloth_id
df5 = df4 \
.groupby("arrival_date", "store_id", "warehouse_id", "class_id", "cloth_id") \
.agg(F.sum("quantity2").alias("want")) \
.withColumnRenamed("arrival_date", "Date")
df5.orderBy('Date').show()
+----------+--------+------------+--------+--------+----+
| Date|store_id|warehouse_id|class_id|cloth_id|want|
+----------+--------+------------+--------+--------+----+
|2020-08-04| 110| 1| 11010| M_1| 10|
|2020-08-06| 111| 1| 11010| M_2| 5|
|2020-08-06| 110| 1| 11010| M_1| 1|
|2020-08-07| 110| 1| 11010| M_1| 3|
+----------+--------+------------+--------+--------+----+
Step-5: create the final dataframe by left join df4 with df5
df_new = df4.join(df5, grp_cols+["cloth_id"], "left").fillna(0, subset=['want'])
df_new.orderBy("Date").show()
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
| Date|store_id|warehouse_id|class_id|cloth_id|quantity2|total_time|arrival_date|transit_date|want|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
|2020-08-01| 110| 1| 11010| M_1| 7| 3| 2020-08-04| null| 0|
|2020-08-02| 110| 1| 11010| M_1| 3| 2| 2020-08-04| null| 0|
|2020-08-03| 110| 1| 11010| M_1| 1| 3| 2020-08-06| null| 0|
|2020-08-04| 110| 1| 11010| M_1| 3| 3| 2020-08-07| 2020-08-01| 10|
|2020-08-05| 111| 1| 11010| M_2| 5| 1| 2020-08-06| null| 0|
+----------+--------+------------+--------+--------+---------+----------+------------+------------+----+
df4.unpersist()
这篇关于Pyspark:如何解决复杂的数据框逻辑加联接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!