如何计算“自定义运行总数"在 spark 1.5 数据框中 [英] How to compute "custom running total" in spark 1.5 dataframe

查看:13
本文介绍了如何计算“自定义运行总数"在 spark 1.5 数据框中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有存储在镶木地板文件中的每个 LoanId 的贷款支付历史记录,并尝试计算每笔贷款的每个时期的逾期"金额.如果不是计算到期金额的棘手性质,这将是简单的窗口分区任务.

I have a loan payment history for each LoanId stored in parquet file and trying calculate "Past Due" amount for each period for each loan. This would be simple partition over window task if not the tricky nature of how due amount is computed.

如果客户支付的款项少于到期金额,则逾期金额会增加,另一方面,如果客户预付款,则后续期间(下例中的第 5 行和第 6 行)将忽略额外的付款.

If customer makes payment less than due amount, than past due mount is increased, on the other hand if customer makes advance payments that extra payment is ignored in the subsequent periods (rows 5&6 in the sample below).

LoanID  Period  DueAmt  ActualPmt   PastDue
1       1       100     100             0
1       2       100     60              -40
1       3       100     100             -40
1       4       100     200             0   <== This advance payment is not rolled to next period
1       5       100     110             0   <== This advance payment is not rolled to next period
1       6       100     80              -20
1       7       100     60              -60
1       8       100     100             -60
2       1       150     150             0
2       2       150     150             0
2       3       150     150             0
3       1       200     200             0
3       2       200     120             -80
3       3       200     120             -160

为了解决这个问题,我实际上需要为按期间排序的每个分区(LoanID)应用自定义函数.

To solve this I effectively need to apply custom function for each partition(LoanID) ordered by period.

spark 中有哪些可用的选项.

What options are available in spark.

简单但复杂似乎使用 DF-> RDD-> groupby,应用 lambda 转换回数据帧.

Straightforward but complicated seems to use DF-> RDD-> groupby, apply lambda convert back to dataframe.

更优雅的是自定义 UDAF(在 Scala 中?)带有 window 函数,但找不到这样的单个实现示例.

More elegant would be custom UDAF (in scala?)with window function but can't find a single implementation example of this.

好的,所以我尝试了从 Dataframe 到 Pair RDD 并返回的第一个解决方案

Ok, so I tried first solution with roundtrip from Dataframe to Pair RDD and back

    from pyspark.sql import Row 
    def dueAmt(partition):
        '''
        @type partition:list 
        '''
        #first sort rows
        sp=sorted(partition, key=lambda r: r.Period )
        res=[]
        due=0
        for r in sp:
            due+=r.ActualPmt-r.DueAmt
            if due>0: due=0;
            #row is immutable so we need to create new row with updated value
            d=r.asDict()
            d['CalcDueAmt']=-due
            newRow=Row(**d)
            res.append(newRow)
        return res    

    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('PmtDueSample2.csv').cache()
    rd1=df.rdd.map(lambda r: (r.LoanID, r ) )
    rd2=rd1.groupByKey()
    rd3=rd2.mapValues(dueAmt)
    rd4=rd3.flatMap(lambda t: t[1] )
    df2=rd4.toDF()

似乎有效.

在这次旅程中,我实际上发现了 pyspark 实现中的几个错误.

On this journey I actually discovered couple of bugs in pyspark implementation.

  1. 在 Row 类中____call____ 的实现是错误的.
  2. 该 Row 的构造函数中存在烦人的错误.没有明显的原因____新____排序列,所以在旅程结束时,我的结果表有列按字母顺序排列.这只会让人更难看最终结果.

推荐答案

既不漂亮也不高效,但应该给你一些东西.让我们从创建和注册表开始:

Neither pretty nor efficient but should give you something to work with. Lets start with creating and registering a table:

val df = sc.parallelize(Seq(
  (1, 1, 100, 100), (1, 2, 100, 60), (1, 3, 100, 100),
  (1, 4, 100, 200), (1, 5, 100, 110), (1, 6, 100, 80),
  (1, 7, 100, 60), (1, 8, 100, 100), (2, 1, 150, 150),
  (2, 2, 150, 150), (2, 3, 150, 150), (3, 1, 200, 200),
  (3, 2, 200, 120), (3, 3, 200, 120)
)).toDF("LoanID", "Period", "DueAmt", "ActualPmt")

df.registerTempTable("df")

接下来让我们定义和注册一个 UDF:

Next lets define and register an UDF:

case class Record(period: Int, dueAmt: Int, actualPmt: Int, pastDue: Int)

def runningPastDue(idxs: Seq[Int], dues: Seq[Int], pmts: Seq[Int]) = {
  def f(acc: List[(Int, Int, Int, Int)], x: (Int, (Int, Int))) = 
    (acc.head, x) match {
      case ((_, _, _, pastDue), (idx, (due, pmt))) => 
        (idx, due, pmt, (pmt - due + pastDue).min(0)) :: acc
    }

  idxs.zip(dues.zip(pmts))
    .toList
    .sortBy(_._1)
    .foldLeft(List((0, 0, 0, 0)))(f)
    .reverse
    .tail
    .map{ case (i, due, pmt, past) => Record(i, due, pmt, past) }
}

sqlContext.udf.register("runningPastDue", runningPastDue _)

汇总并计算总和:

val aggregated = sqlContext.sql("""
  SELECT LoanID, explode(pmts) pmts FROM (
    SELECT LoanId, 
           runningPastDue(
             collect_list(Period), 
             collect_list(DueAmt), 
             collect_list(ActualPmt)
           ) pmts
    FROM df GROUP BY LoanID) tmp""")

val flattenExprs = List("Period", "DueAmt", "ActualPmt", "PastDue")
  .zipWithIndex
  .map{case (c, i) => col(s"tmp._${i+1}").alias(c)}

最后变平:

val result = aggregated.select($"LoanID" :: flattenExprs: _*)

这篇关于如何计算“自定义运行总数"在 spark 1.5 数据框中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆