PySpark 一次性完成分区上的第一个和最后一个函数 [英] PySpark first and last function over a partition in one go
问题描述
我有这样的 pyspark 代码,
I have pyspark code like this,
spark_df = spark_df.orderBy('id', 'a1', 'c1')
out_df = spark_df.groupBy('id', 'a1', 'a2').agg(
F.first('c1').alias('c1'),
F.last('c2').alias('c2'),
F.first('c3').alias('c3'))
我需要将数据按顺序 id、a1 和 c1 排序.然后在键 id、a1 和 c1 上定义的组上选择如上所示的列.
I need to keep the data ordered in the order id, a1 and c1. Then select columns as shown above over the group defined over the keys id, a1 and c1.
由于第一个和最后一个不确定性,我将代码更改为这个看起来很丑的代码,但我不确定它是否有效.
Due to first and last non-determinism I changed the code to this ugly looking code which works but I'm not sure that is efficient.
w_first = Window.partitionBy('id', 'a1', 'a2').orderBy('c1')
w_last = Window.partitionBy('id', 'a1', 'a2').orderBy(F.desc('c1'))
out_first = spark_df.withColumn('Rank_First', F.rank().over(w_first)).filter(F.col('Rank_First') == 1).drop(
'Rank_First')
out_last = spark_df.withColumn('Rank_Last', F.rank().over(w_last)).filter(F.col('Rank_First') == 1).drop(
'Rank_Last')
out_first = out_first.withColumnRenamed('c1', 'First_c1') \
.withColumnRenamed('c2', 'First_c2') \
.withColumnRenamed('c3', 'First_c3')
out_last = out_last.withColumnRenamed('c1', 'Last_c1') \
.withColumnRenamed('c2', 'Last_c2') \
.withColumnRenamed('c3', 'Last_c3')
out_df = out_first.join(out_last, ['id', 'a1', 'a2']) \
.select('id', 'a1', 'a2', F.col('First_c1').alias('c1'),
F.col('Last_c2').alias('c2'),
F.col('First_c3').alias('c3'))
我一直在寻找更好、更高效的替代方案.当数据量很大时,我会遇到性能瓶颈.
I was trying for a better and efficient alternative. I run in to bottle necks in performance when data size is huge.
有没有更好的替代方法来一次性完成按特定顺序排序的窗口的第一个和最后一个.
Is there a better alternative to do first and last over a window ordered in a specific order in one go.
推荐答案
你仍然可以通过使用 Window
last 和 first
函数的确定性> 使用排序,您需要在指定 Window 时使用 rowsBetween
定义边界,以便 last
给出正确的值(根据此 发布).
You could still guarantee determinism for last
and first
functions by using Window
with ordering and you need to define bounds with rowsBetween
when specifying the Window so that last
gives the correct values (as per this post).
试试这个:
w = Window.partitionBy('id', 'a1', 'a2').orderBy('c1') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df = df.withColumn("First_c1", first("c1").over(w)) \
.withColumn("First_c3", first("c3").over(w)) \
.withColumn("Last_c2", last("c2").over(w))
df.groupby("id", "a1", "a2")\
.agg(first("First_c1").alias("c1"),
first("Last_c2").alias("c2"),
first("First_c3").alias("c3")
).show()
这篇关于PySpark 一次性完成分区上的第一个和最后一个函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!