如何计算“自定义运行总数"在 spark 1.5 数据框中 [英] How to compute "custom running total" in spark 1.5 dataframe
问题描述
我有存储在镶木地板文件中的每个 LoanId 的贷款支付历史记录,并尝试计算每笔贷款的每个时期的逾期"金额.如果不是计算到期金额的棘手性质,这将是简单的窗口分区任务.
I have a loan payment history for each LoanId stored in parquet file and trying calculate "Past Due" amount for each period for each loan. This would be simple partition over window task if not the tricky nature of how due amount is computed.
如果客户支付的款项少于到期金额,则逾期金额会增加,另一方面,如果客户预付款,则后续期间(下例中的第 5 行和第 6 行)将忽略额外的付款.
If customer makes payment less than due amount, than past due mount is increased, on the other hand if customer makes advance payments that extra payment is ignored in the subsequent periods (rows 5&6 in the sample below).
LoanID Period DueAmt ActualPmt PastDue
1 1 100 100 0
1 2 100 60 -40
1 3 100 100 -40
1 4 100 200 0 <== This advance payment is not rolled to next period
1 5 100 110 0 <== This advance payment is not rolled to next period
1 6 100 80 -20
1 7 100 60 -60
1 8 100 100 -60
2 1 150 150 0
2 2 150 150 0
2 3 150 150 0
3 1 200 200 0
3 2 200 120 -80
3 3 200 120 -160
为了解决这个问题,我实际上需要为按期间排序的每个分区(LoanID)应用自定义函数.
To solve this I effectively need to apply custom function for each partition(LoanID) ordered by period.
spark 中有哪些可用的选项.
What options are available in spark.
简单但复杂似乎使用 DF-> RDD-> groupby,应用 lambda 转换回数据帧.
Straightforward but complicated seems to use DF-> RDD-> groupby, apply lambda convert back to dataframe.
更优雅的是自定义 UDAF(在 Scala 中?)带有 window 函数,但找不到这样的单个实现示例.
More elegant would be custom UDAF (in scala?)with window function but can't find a single implementation example of this.
好的,所以我尝试了从 Dataframe 到 Pair RDD 并返回的第一个解决方案
Ok, so I tried first solution with roundtrip from Dataframe to Pair RDD and back
from pyspark.sql import Row
def dueAmt(partition):
'''
@type partition:list
'''
#first sort rows
sp=sorted(partition, key=lambda r: r.Period )
res=[]
due=0
for r in sp:
due+=r.ActualPmt-r.DueAmt
if due>0: due=0;
#row is immutable so we need to create new row with updated value
d=r.asDict()
d['CalcDueAmt']=-due
newRow=Row(**d)
res.append(newRow)
return res
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('PmtDueSample2.csv').cache()
rd1=df.rdd.map(lambda r: (r.LoanID, r ) )
rd2=rd1.groupByKey()
rd3=rd2.mapValues(dueAmt)
rd4=rd3.flatMap(lambda t: t[1] )
df2=rd4.toDF()
似乎有效.
在这次旅程中,我实际上发现了 pyspark 实现中的几个错误.
On this journey I actually discovered couple of bugs in pyspark implementation.
- 在 Row 类中____call____ 的实现是错误的.
- 该 Row 的构造函数中存在烦人的错误.没有明显的原因____新____排序列,所以在旅程结束时,我的结果表有列按字母顺序排列.这只会让人更难看最终结果.
推荐答案
既不漂亮也不高效,但应该给你一些东西.让我们从创建和注册表开始:
Neither pretty nor efficient but should give you something to work with. Lets start with creating and registering a table:
val df = sc.parallelize(Seq(
(1, 1, 100, 100), (1, 2, 100, 60), (1, 3, 100, 100),
(1, 4, 100, 200), (1, 5, 100, 110), (1, 6, 100, 80),
(1, 7, 100, 60), (1, 8, 100, 100), (2, 1, 150, 150),
(2, 2, 150, 150), (2, 3, 150, 150), (3, 1, 200, 200),
(3, 2, 200, 120), (3, 3, 200, 120)
)).toDF("LoanID", "Period", "DueAmt", "ActualPmt")
df.registerTempTable("df")
接下来让我们定义和注册一个 UDF:
Next lets define and register an UDF:
case class Record(period: Int, dueAmt: Int, actualPmt: Int, pastDue: Int)
def runningPastDue(idxs: Seq[Int], dues: Seq[Int], pmts: Seq[Int]) = {
def f(acc: List[(Int, Int, Int, Int)], x: (Int, (Int, Int))) =
(acc.head, x) match {
case ((_, _, _, pastDue), (idx, (due, pmt))) =>
(idx, due, pmt, (pmt - due + pastDue).min(0)) :: acc
}
idxs.zip(dues.zip(pmts))
.toList
.sortBy(_._1)
.foldLeft(List((0, 0, 0, 0)))(f)
.reverse
.tail
.map{ case (i, due, pmt, past) => Record(i, due, pmt, past) }
}
sqlContext.udf.register("runningPastDue", runningPastDue _)
汇总并计算总和:
val aggregated = sqlContext.sql("""
SELECT LoanID, explode(pmts) pmts FROM (
SELECT LoanId,
runningPastDue(
collect_list(Period),
collect_list(DueAmt),
collect_list(ActualPmt)
) pmts
FROM df GROUP BY LoanID) tmp""")
val flattenExprs = List("Period", "DueAmt", "ActualPmt", "PastDue")
.zipWithIndex
.map{case (c, i) => col(s"tmp._${i+1}").alias(c)}
最后变平:
val result = aggregated.select($"LoanID" :: flattenExprs: _*)
这篇关于如何计算“自定义运行总数"在 spark 1.5 数据框中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!