PySpark在分区上的第一个和最后一个功能 [英] PySpark first and last function over a partition in one go

查看:69
本文介绍了PySpark在分区上的第一个和最后一个功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有这样的pyspark代码,

I have pyspark code like this,

spark_df = spark_df.orderBy('id', 'a1', 'c1')
out_df = spark_df.groupBy('id', 'a1', 'a2').agg(
    F.first('c1').alias('c1'),
    F.last('c2').alias('c2'),
    F.first('c3').alias('c3'))

我需要保持数据按订单ID,a1和c1排序.然后,在键id,a1和c1定义的组上选择上面显示的列.

I need to keep the data ordered in the order id, a1 and c1. Then select columns as shown above over the group defined over the keys id, a1 and c1.

由于第一个和最后一个不确定性,我将代码更改为看起来丑陋的代码,该代码可以工作,但我不确定这是有效的.

Due to first and last non-determinism I changed the code to this ugly looking code which works but I'm not sure that is efficient.

w_first = Window.partitionBy('id', 'a1', 'a2').orderBy('c1')
w_last = Window.partitionBy('id', 'a1', 'a2').orderBy(F.desc('c1'))

out_first = spark_df.withColumn('Rank_First', F.rank().over(w_first)).filter(F.col('Rank_First') == 1).drop(
    'Rank_First')
out_last = spark_df.withColumn('Rank_Last', F.rank().over(w_last)).filter(F.col('Rank_First') == 1).drop(
    'Rank_Last')

out_first = out_first.withColumnRenamed('c1', 'First_c1') \
    .withColumnRenamed('c2', 'First_c2') \
    .withColumnRenamed('c3', 'First_c3')

out_last = out_last.withColumnRenamed('c1', 'Last_c1') \
    .withColumnRenamed('c2', 'Last_c2') \
    .withColumnRenamed('c3', 'Last_c3')

out_df = out_first.join(out_last, ['id', 'a1', 'a2']) \
    .select('id', 'a1', 'a2', F.col('First_c1').alias('c1'),
            F.col('Last_c2').alias('c2'),
            F.col('First_c3').alias('c3'))

我一直在寻找更好,更有效的替代方案.当数据量巨大时,我会遇到性能瓶颈.

I was trying for a better and efficient alternative. I run in to bottle necks in performance when data size is huge.

有没有更好的选择,可以一次按特定顺序对一个窗口进行先行和后行操作.

Is there a better alternative to do first and last over a window ordered in a specific order in one go.

推荐答案

通过使用 Window last first 函数的确定性>进行排序,并且在指定Window时需要使用 rowsBetween 定义范围,以便 last 给出正确的值(按照

You could still guarantee determinism for last and first functions by using Window with ordering and you need to define bounds with rowsBetween when specifying the Window so that last gives the correct values (as per this post).

尝试一下:

w = Window.partitionBy('id', 'a1', 'a2').orderBy('c1') \
          .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df = df.withColumn("First_c1", first("c1").over(w)) \
      .withColumn("First_c3", first("c3").over(w)) \
      .withColumn("Last_c2", last("c2").over(w))

df.groupby("id", "a1", "a2")\
  .agg(first("First_c1").alias("c1"),
       first("Last_c2").alias("c2"),
       first("First_c3").alias("c3")
  ).show()

这篇关于PySpark在分区上的第一个和最后一个功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆