Pyspark:具有重置条件的累积总和 [英] Pyspark : Cumulative Sum with reset condition

查看:39
本文介绍了Pyspark:具有重置条件的累积总和的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有如下数据框:

+------+--------------------+
| Flag |               value|
+------+--------------------+
|1     |5                   |
|1     |4                   |
|1     |3                   |
|1     |5                   |
|1     |6                   |
|1     |4                   |
|1     |7                   |
|1     |5                   |
|1     |2                   |
|1     |3                   |
|1     |2                   |
|1     |6                   |
|1     |9                   |      
+------+--------------------+

在正常 cumsum 之后,我们得到了这个.

After normal cumsum we get this.

+------+--------------------+----------+
| Flag |               value|cumsum    |
+------+--------------------+----------+
|1     |5                   |5         |
|1     |4                   |9         |
|1     |3                   |12        |
|1     |5                   |17        |
|1     |6                   |23        |
|1     |4                   |27        |
|1     |7                   |34        |
|1     |5                   |39        |
|1     |2                   |41        |
|1     |3                   |44        |
|1     |2                   |46        |
|1     |6                   |52        |
|1     |9                   |61        |       
+------+--------------------+----------+

现在我们想要的是在为 ex 设置特定条件时重置 cumsum.当它越过 20 时.

Now what we want is for cumsum to reset when specific condition is set for ex. when it crosses 20.

以下是预期输出:

+------+--------------------+----------+---------+
| Flag |               value|cumsum    |expected |
+------+--------------------+----------+---------+
|1     |5                   |5         |5        |
|1     |4                   |9         |9        |
|1     |3                   |12        |12       |
|1     |5                   |17        |17       |
|1     |6                   |23        |23       |
|1     |4                   |27        |4        |  <-----reset 
|1     |7                   |34        |11       |
|1     |5                   |39        |16       |
|1     |2                   |41        |18       |
|1     |3                   |44        |21       |
|1     |2                   |46        |2        |  <-----reset
|1     |6                   |52        |8        |
|1     |9                   |61        |17       |         
+------+--------------------+----------+---------+

这就是我们计算累积总和的方式.

This is how we are calculating the cumulative sum.

win_counter = Window.partitionBy("flag")

df_partitioned = df_partitioned.withColumn('cumsum',F.sum(F.col('value')).over(win_counter))

推荐答案

我发现有两种方法可以在没有 udf 的情况下解决这个问题:

There are two ways I've found to solve it without udf:

from pyspark.sql.window import Window
import pyspark.sql.functions as f


df = spark.createDataFrame([
  (1, 5), (1, 4), (1, 3), (1, 5), (1, 6), (1, 4),
  (1, 7), (1, 5), (1, 2), (1, 3), (1, 2), (1, 6), (1, 9)
], schema='Flag int, value int')

w = (Window
     .partitionBy('flag')
     .orderBy(f.monotonically_increasing_id())
     .rowsBetween(Window.unboundedPreceding, Window.currentRow))
df = df.withColumn('values', f.collect_list('value').over(w))

expr = "AGGREGATE(values, 0, (acc, el) -> IF(acc < 20, acc + el, el))"
df = df.select('Flag', 'value', f.expr(expr).alias('cumsum'))

df.show(truncate=False)

RDD

df = spark.createDataFrame([
  (1, 5), (1, 4), (1, 3), (1, 5), (1, 6), (1, 4),
  (1, 7), (1, 5), (1, 2), (1, 3), (1, 2), (1, 6), (1, 9)
], schema='Flag int, value int')

def cumsum_by_flag(rows):
  cumsum, reset = 0, False
  for row in rows:
    if reset:
      cumsum = row.value
      reset = False
    else:
      cumsum += row.value
      
    reset = cumsum > 20
    yield row.value, cumsum
    
    
def unpack(value):
  flag = value[0]
  value, cumsum = value[1]
  return flag, value, cumsum


rdd = df.rdd.keyBy(lambda row: row.Flag)
rdd = (rdd
       .groupByKey()
       .flatMapValues(cumsum_by_flag)
       .map(unpack))

df = rdd.toDF('Flag int, value int, cumsum int')
df.show(truncate=False)

输出:

+----+-----+------+
|Flag|value|cumsum|
+----+-----+------+
|1   |5    |5     |
|1   |4    |9     |
|1   |3    |12    |
|1   |5    |17    |
|1   |6    |23    |
|1   |4    |4     |
|1   |7    |11    |
|1   |5    |16    |
|1   |2    |18    |
|1   |3    |21    |
|1   |2    |2     |
|1   |6    |8     |
|1   |9    |17    |
+----+-----+------+

这篇关于Pyspark:具有重置条件的累积总和的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆