如何计算"自定义运行总计"火花1.5数据框 [英] How to compute "custom running total" in spark 1.5 dataframe

查看:281
本文介绍了如何计算"自定义运行总计"火花1.5数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个贷款付款记录每个的LoanID存储在文件实木复合地板,并试图计算出每个时期每个贷款逾期的金额。
这将是简单的分区在窗口的任务,如果不是如何适当量的计算棘手的性质。

如果客户作出付款低于到期金额,比过去由于坐骑增加,在另一方面,如果客户做额外的付款以后期间忽略预收款项(行5和下面的示例中6)

 的LoanID期DueAmt ActualPmt PastDue
1 1 100 100 0
1 2 100 60 -40
1 3 100 100 -40
1 4 100 200 0℃; ==这笔预付款不滚到下一期
1 5 100 110 0℃; ==这笔预付款不滚到下一期
1 6 100 80 -20
1 7 100 60 -60
1 8 100 100 -60
2 1 150 150 0
2 150 150 0
2 3 150 150 0
3 1 200 200 0
3 2 200 120 -80
3 3 200 120 -160

要解决这个问题我需要有效应用自定义功能由周期堆垛每个分区(的LoanID)。

什么股票期权有火花可用。

简单,但复杂的似乎用DF-> RDD-> GROUPBY,适用的lambda转换回数据帧。

更优雅的将是自定义的 UDAF (Scala中?)是窗口的功能,但无法找到这一个实现的例子。


好了,我试着从数据框往返首个解决方案RDD配对和背部。

 从pyspark.sql进口排
    高清dueAmt(分区):
        '''
        @type分区:列表
        '''
        #first排序行
        SP =排序(分区键=拉姆达R:r.Period)
        RES = []
        由于= 0
        在SP R:
            由于+ = r.ActualPmt,r.DueAmt
            如果由于大于0:由于= 0;
            #row是不可改变的,所以我们需要创建一个新的行与更新的价值
            D = r.asDict()
            D ['CalcDueAmt'] = - 由于
            NEWROW =行(** D)
            res.append(NEWROW)
        返回水库    DF = sqlContext.read.format('com.databricks.spark.csv')。选项(标题=真,则InferSchema =真)。负载(PmtDueSample2.csv')。高速缓存()
    RD1 = df.rdd.map(拉姆达R:(r.LoanID,R))
    RD2 = rd1.groupByKey()
    RD3 = rd2.mapValues​​(dueAmt)
    RD4 = rd3.flatMap(拉姆达T:T [1])
    DF2 = rd4.toDF()

似乎工作。

在这个旅程其实我在pyspark执行发现几个错误。


  1. 在班级排____call____的实现是错误的。

  2. 在该行的构造恼人的错误。没有明显的理由____new____排序
    列,所以在旅程结束我的结果表列了
    按字母顺序排列。这只是使得很难看
    最终的结果。


解决方案

无论是pretty,也没有效率,但应该给你一些帮助。让我们开始创建和注册表格:

  VAL DF = sc.parallelize(SEQ(
  (1,1,100,100),(1,2,100,60),(1,3,100,100),
  (1,4,100,200),(1,5,100,110),(1,6,100,80),
  (1,7,100,60),(1,8,100,100),(2,1,150,150),
  (2,2,150,150),(2,3,150,150),(3,1,200,200),
  (3,2,200,120),(3,3,200,120)
))。toDF(的LoanID,时代,DueAmt,ActualPmt)df.registerTempTable(DF)

接下来让定义和注册一个UDF:

 案例类记录(期间:智力,dueAmt:智力,actualPmt:智力,pastDue:智力)高清runningPastDue(idxs:序号[INT],会费:序号[INT],光电倍增管:序号[INT])= {
  DEF F(ACC:列表[(INT,INT,INT,INT)],X:(智力(INT,INT)))=
    (acc.head,X){匹配
      情况下((_,_,_,pastDue),(IDX,(由于,PMT)))=>
        (IDX,由于,PMT(PMT - 由于+ pastDue).min(0)):: ACC
    }  idxs.zip(dues.zip(光电倍增管))
    .toList
    .sortBy(_._ 1)
    .foldLeft(列表((0,0,0,0)))(F)
    。相反
    。尾巴
    .MAP {情况下(我,因为,PMT,过去)=>记录(我,因为,PMT,过去)}
}sqlContext.udf.register(runningPastDue,runningPastDue _)

汇总和计算总和:

  VAL合计= sqlContext.sql(
  选择的LoanID,爆炸(光电倍增管)FROM光电倍增管(
    SELECT的LoanID,
           runningPastDue(
             collect_list(周期),
             collect_list(DueAmt)
             collect_list(ActualPmt)
           )光电倍增管
    从DF GROUP BY的LoanID)TMP)VAL flattenExprs =名单(时代,DueAmt,ActualPmt,PastDue)
  .zipWithIndex
  .MAP {情况下(C,I)=> COL(STMP ._ $ {I + 1})。别名(C)}

最后压平:

  VAL结果= aggregated.select($的LoanID:: flattenExprs:_ *)

I have a loan payment history for each LoanId stored in parquet file and trying calculate "Past Due" amount for each period for each loan. This would be simple partition over window task if not the tricky nature of how due amount is computed.

If customer makes payment less than due amount, than past due mount is increased, on the other hand if customer makes advance payments that extra payment is ignored in the subsequent periods (rows 5&6 in the sample below).

LoanID  Period  DueAmt  ActualPmt   PastDue
1       1       100     100             0
1       2       100     60              -40
1       3       100     100             -40
1       4       100     200             0   <== This advance payment is not rolled to next period
1       5       100     110             0   <== This advance payment is not rolled to next period
1       6       100     80              -20
1       7       100     60              -60
1       8       100     100             -60
2       1       150     150             0
2       2       150     150             0
2       3       150     150             0
3       1       200     200             0
3       2       200     120             -80
3       3       200     120             -160

To solve this I effectively need to apply custom function for each partition(LoanID) ordered by period.

What options are available in spark.

Straightforward but complicated seems to use DF-> RDD-> groupby, apply lambda convert back to dataframe.

More elegant would be custom UDAF (in scala?)with window function but can't find a single implementation example of this.


Ok, so I tried first solution with roundtrip from Dataframe to Pair RDD and back

    from pyspark.sql import Row 
    def dueAmt(partition):
        '''
        @type partition:list 
        '''
        #first sort rows
        sp=sorted(partition, key=lambda r: r.Period )
        res=[]
        due=0
        for r in sp:
            due+=r.ActualPmt-r.DueAmt
            if due>0: due=0;
            #row is immutable so we need to create new row with updated value
            d=r.asDict()
            d['CalcDueAmt']=-due
            newRow=Row(**d)
            res.append(newRow)
        return res    

    df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('PmtDueSample2.csv').cache()
    rd1=df.rdd.map(lambda r: (r.LoanID, r ) )
    rd2=rd1.groupByKey()
    rd3=rd2.mapValues(dueAmt)
    rd4=rd3.flatMap(lambda t: t[1] )
    df2=rd4.toDF()

Seems to work.

On this journey I actually discovered couple of bugs in pyspark implementation.

  1. Implementation of ____call____ in class Row is wrong.
  2. Annoying bug in that Row's constructor. For no obvious reason ____new____ sorts columns, so at the end of journey my resulting table had columns ordered alphabetically. This simply makes harder to look at the final result.

解决方案

Neither pretty nor efficient but should give you something to work with. Lets start with creating and registering a table:

val df = sc.parallelize(Seq(
  (1, 1, 100, 100), (1, 2, 100, 60), (1, 3, 100, 100),
  (1, 4, 100, 200), (1, 5, 100, 110), (1, 6, 100, 80),
  (1, 7, 100, 60), (1, 8, 100, 100), (2, 1, 150, 150),
  (2, 2, 150, 150), (2, 3, 150, 150), (3, 1, 200, 200),
  (3, 2, 200, 120), (3, 3, 200, 120)
)).toDF("LoanID", "Period", "DueAmt", "ActualPmt")

df.registerTempTable("df")

Next lets define and register an UDF:

case class Record(period: Int, dueAmt: Int, actualPmt: Int, pastDue: Int)

def runningPastDue(idxs: Seq[Int], dues: Seq[Int], pmts: Seq[Int]) = {
  def f(acc: List[(Int, Int, Int, Int)], x: (Int, (Int, Int))) = 
    (acc.head, x) match {
      case ((_, _, _, pastDue), (idx, (due, pmt))) => 
        (idx, due, pmt, (pmt - due + pastDue).min(0)) :: acc
    }

  idxs.zip(dues.zip(pmts))
    .toList
    .sortBy(_._1)
    .foldLeft(List((0, 0, 0, 0)))(f)
    .reverse
    .tail
    .map{ case (i, due, pmt, past) => Record(i, due, pmt, past) }
}

sqlContext.udf.register("runningPastDue", runningPastDue _)

Aggregate, and compute sums:

val aggregated = sqlContext.sql("""
  SELECT LoanID, explode(pmts) pmts FROM (
    SELECT LoanId, 
           runningPastDue(
             collect_list(Period), 
             collect_list(DueAmt), 
             collect_list(ActualPmt)
           ) pmts
    FROM df GROUP BY LoanID) tmp""")

val flattenExprs = List("Period", "DueAmt", "ActualPmt", "PastDue")
  .zipWithIndex
  .map{case (c, i) => col(s"tmp._${i+1}").alias(c)}

Finally flatten:

val result = aggregated.select($"LoanID" :: flattenExprs: _*)

这篇关于如何计算&QUOT;自定义运行总计&QUOT;火花1.5数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆