如何在 PySpark collect_list 中维护排序顺序并收集多个列表 [英] How to maintain sort order in PySpark collect_list and collect multiple lists
问题描述
我想维护日期排序顺序,对多列使用 collect_list,所有列都具有相同的日期顺序.我将在同一个数据框中需要它们,以便我可以利用它们来创建时间序列模型输入.以下是train_data"的示例:
I want to maintain the date sort-order, using collect_list for multiple columns, all with the same date order. I'll need them in the same dataframe so I can utilize to create a time series model input. Below is a sample of the "train_data":
我正在使用带有 PartitionBy 的窗口,以通过每个 Syscode_Stn 的 tuning_evnt_start_dt 来确保排序顺序.我可以使用以下代码创建一列:
I'm using a Window with PartitionBy to ensure sort order by tuning_evnt_start_dt for each Syscode_Stn. I can create one column with this code:
from pyspark.sql import functions as F
from pyspark.sql import Window
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data
.withColumn('spp_imp_daily', F.collect_list('spp_imp_daily').over(w)
)\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'))
但是如何在同一个新数据框中创建两列?
but how do I create two columns in the same new dataframe?
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data
.withColumn('spp_imp_daily',F.collect_list('spp_imp_daily').over(w))
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))
.groupBy('Syscode_Stn')
.agg(F.max('spp_imp_daily').alias('spp_imp_daily')))
请注意,MarchMadInd 未显示在屏幕截图中,但包含在 train_data 中.关于我如何到达我所在位置的说明:https://stackoverflow.com/a/49255498/8691976
Note that MarchMadInd is not shown in the screenshot, but is included in train_data. Explanation of how I got to where I am: https://stackoverflow.com/a/49255498/8691976
推荐答案
是的,正确的方法是添加连续的 .withColumn 语句,然后是删除每个数组重复项的 .agg 语句.
Yes, the correct way is to add successive .withColumn statements, followed by a .agg statement that removes the duplicates for each array.
w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')
sorted_list_df = train_data.withColumn('spp_imp_daily',
F.collect_list('spp_imp_daily').over(w)
)\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'),
F.max('MarchMadInd').alias('MarchMadInd')
)
这篇关于如何在 PySpark collect_list 中维护排序顺序并收集多个列表的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!