如何在PySpark collect_list中维护排序顺序并收集多个列表 [英] How to maintain sort order in PySpark collect_list and collect multiple lists
问题描述
我想通过对多个列使用collect_list来维护日期排序顺序,所有列都具有相同的日期顺序.我将在同一数据帧中使用它们,因此可以利用它来创建时间序列模型输入.以下是"train_data"的示例:
I want to maintain the date sort-order, using collect_list for multiple columns, all with the same date order. I'll need them in the same dataframe so I can utilize to create a time series model input. Below is a sample of the "train_data":
我正在使用一个带有PartitionBy的窗口,以通过Tuning_evnt_start_dt为每个Syscode_Stn确保排序顺序.我可以使用以下代码创建一列:
I'm using a Window with PartitionBy to ensure sort order by tuning_evnt_start_dt for each Syscode_Stn. I can create one column with this code:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data
.withColumn('spp_imp_daily', F.collect_list('spp_imp_daily').over(w)
)\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'))
但是如何在同一个新数据框中创建两个列?
but how do I create two columns in the same new dataframe?
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data
.withColumn('spp_imp_daily',F.collect_list('spp_imp_daily').over(w))
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))
.groupBy('Syscode_Stn')
.agg(F.max('spp_imp_daily').alias('spp_imp_daily')))
请注意,MarchMadInd未显示在屏幕截图中,但包含在train_data中.我如何到达自己的位置的说明: https://stackoverflow.com/a/49255498/8691976
Note that MarchMadInd is not shown in the screenshot, but is included in train_data. Explanation of how I got to where I am: https://stackoverflow.com/a/49255498/8691976
推荐答案
是的,正确的方法是添加连续的.withColumn语句,然后添加.agg语句,以删除每个数组的重复项.
Yes, the correct way is to add successive .withColumn statements, followed by a .agg statement that removes the duplicates for each array.
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data.withColumn('spp_imp_daily',
F.collect_list('spp_imp_daily').over(w)
)\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'),
F.max('MarchMadInd').alias('MarchMadInd')
)
这篇关于如何在PySpark collect_list中维护排序顺序并收集多个列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!