根据Spark中前一行的同一列中的值计算值 [英] Calculate value based on value from same column of the previous row in spark

查看:92
本文介绍了根据Spark中前一行的同一列中的值计算值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我遇到一个问题,我必须使用公式来计算列,该公式使用上一行中进行的计算得出的值.

I have an issue where I have to calculate a column using a formula that uses the value from the calculation done in the previous row.

我无法使用 withColumn API弄清楚它.

I am unable to figure it out using withColumn API.

我需要使用以下公式计算新列:

I need to calculate a new column, using the formula:

MovingRate = MonthlyRate +(0.7 * MovingRatePrevious)

...,其中 MovingRatePrevious 是上一行的 MovingRate .

... where the MovingRatePrevious is the MovingRate of the prior row.

对于第1个月,我具有该值,因此不需要重新计算该值,但是需要该值才能计算后续的行.我需要按类型进行分区.

For month 1, I have the value so I do not need to re-calculate that but I need that value to be able to calculate the subsequent rows. I need to partition by Type.

这是我的原始数据集:

MovingRate列中所需的结果:

Desired results in MovingRate column:

推荐答案

尽管它可以与Widow Functions一起使用(请参阅@Leo C的答案),但我敢打赌它的性能更高,可以为每个 Type 进行一次汇总使用 groupBy .然后,分解UDF的结果以取回所有行:

Altough its possible to do with Widow Functions (See @Leo C's answer), I bet its more performant to aggregate once per Type using a groupBy. Then, explode the results of the UDF to get all rows back:

val df = Seq(
  (1, "blue", 0.4, Some(0.33)),
  (2, "blue", 0.3, None),
  (3, "blue", 0.7, None),
  (4, "blue", 0.9, None)
)
.toDF("Month", "Type", "MonthlyRate", "MovingRate")

// this udf produces an Seq of Tuple3 (Month, MonthlyRate, MovingRate)
val calcMovingRate = udf((startRate:Double,rates:Seq[Row]) => rates.tail
  .scanLeft((rates.head.getInt(0),startRate,startRate))((acc,curr) => (curr.getInt(0),curr.getDouble(1),acc._3+0.7*curr.getDouble(1)))
)

df
  .groupBy($"Type")
  .agg(
    first($"MovingRate",ignoreNulls=true).as("startRate"),
    collect_list(struct($"Month",$"MonthlyRate")).as("rates")
  )
  .select($"Type",explode(calcMovingRate($"startRate",$"rates")).as("movingRates"))
  .select($"Type",$"movingRates._1".as("Month"),$"movingRates._2".as("MonthlyRate"),$"movingRates._3".as("MovingRate"))
  .show()

给予:

+----+-----+-----------+------------------+
|Type|Month|MonthlyRate|        MovingRate|
+----+-----+-----------+------------------+
|blue|    1|       0.33|              0.33|
|blue|    2|        0.3|              0.54|
|blue|    3|        0.7|              1.03|
|blue|    4|        0.9|1.6600000000000001|
+----+-----+-----------+------------------+

这篇关于根据Spark中前一行的同一列中的值计算值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆