如何使用新列scala在内存转换中应用窗口函数 [英] how to apply window function in memory transformation with new column scala

查看:15
本文介绍了如何使用新列scala在内存转换中应用窗口函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据框,我想将其转换为下面的输出,其中每一行 start_duration 和 end_duration 将由前一行 start_duration 和 end_duration 产生,请告诉我如何使用 scala 在 spark 中实现它.

I have a dataframe which I want to transform as below output where each row start_duration and end_duration will be resulted from previous row start_duration and end_duration, please let me know how to achieve it in spark using scala.

以下是计算 start_duration 和 end_duration 的公式:

Below is the formulae to calculate start_duration and end_duration :

start_duration = max(previous end_duration + 1, current date); 
end_duration = min(presciption_end date, start_duration + duration – 1)

以下是我的输入数据框:

Below is my input data frame :

+--------

--------+-----------+---------+-----------+----------------+----------+--------+----------+----------+
|prescription_uid|patient_uid|ndc      |label      |dispensation_uid|date      |duration|start_date|end_date  |
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+
|0               |0          |16714-128|sinvastatin|0               |2015-06-10|30      |2015-06-01|2015-12-01|
|0               |0          |16714-128|sinvastatin|1               |2015-07-15|30      |2015-06-01|2015-12-01|
|0               |0          |16714-128|sinvastatin|2               |2015-08-01|30      |2015-06-01|2015-12-01|
|0               |0          |16714-128|sinvastatin|3               |2015-10-01|30      |2015-06-01|2015-12-01|
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+

预期输出数据帧:

EXPECTED RESULT:
    +--------
    
        --------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
        |prescription_uid|patient_uid|ndc      |label      |dispensation_uid|date      |duration|start_date|end_date  |first_start_duration|first_end_duration|start_duration|end_duration|
        +----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
        |0               |0          |16714-128|sinvastatin|0               |2015-06-10|30      |2015-06-01|2015-12-01|2015-06-10          |2015-07-09        |2015-06-10    |2015-07-09  |
        |0               |0          |16714-128|sinvastatin|1               |2015-07-15|30      |2015-06-01|2015-12-01|2015-06-10          |2015-07-09        |2015-07-15    |2015-08-13  |
        |0               |0          |16714-128|sinvastatin|2               |2015-08-01|30      |2015-06-01|2015-12-01|2015-06-10          |2015-07-09        |2015-08-14    |2015-09-13  |
        |0               |0          |16714-128|sinvastatin|3               |2015-10-01|30      |2015-06-01|2015-12-01|2015-06-10          |2015-07-09        |2015-10-01    |2015-10-30  |
        +----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
    
Code tried : 
val windowByPatient = Window.partitionBy($"patient_uid").orderBy($"date")
    val windowByPatientBeforeCurrentRow = windowByPatient.rowsBetween(Window.unboundedPreceding, -1)
    joinedPrDF = joinedPrDF
      .withColumn("first_start_duration", firstStartDuration(first($"date").over(windowByPatient), $"start_date"))
      .withColumn("first_end_duration", firstEndDuration($"first_start_duration", $"end_date", $"duration"))
      .withColumn("start_duration", when(count("*").over(windowByPatient) === 1, $"first_start_duration")
        .otherwise(startDurationCalc($"first_start_duration", $"date", $"start_date", coalesce(sum($"duration").over(windowByPatientBeforeCurrentRow), lit("0")))))
      .withColumn("end_duration", when(count("*").over(windowByPatient) === 1, $"first_end_duration")
        .otherwise(endDurationCalc($"end_date", $"start_duration", $"duration")))

UDF:

val startDurationCalc = udf( (firstStrtDur:java.sql.Date, currentDsDate:java.sql.Date,
                                      prsStartDate:java.sql.Date,duration:Int) => {
      println("==="+firstStrtDur+"==="+currentDsDate +"==="+prsStartDate +"==="+duration )

        var startDate = java.sql.Date.valueOf(firstStrtDur.toLocalDate.plusDays(duration))
        if (startDate.after(currentDsDate)) {
          startDate
        } else {
          currentDsDate
        }
    } : java.sql.Date)

    val endDurationCalc = udf( (prsEndDate:java.sql.Date, startDuration:java.sql.Date,duration:Int) => {

      println("endDateCalcContRow ==="+prsEndDate+"==="+startDuration +"==="+duration )

      val currEndDate = java.sql.Date.valueOf(startDuration.toLocalDate.plusDays(duration-1))
      if (currEndDate.before(prsEndDate)) {
        currEndDate
      } else {
        prsEndDate
      }

    } : java.sql.Date)

推荐答案

以下是在上一个持续时间和上一个分配日期上使用滞后窗口函数的最终启动持续时间计算器:

Below is the final start duration calculator using lag window function on previous duration and previous dispensation date :

val startDurationCalc = udf((currentDsDate: java.sql.Date, prevDsDate: java.sql.Date, prevDuration: Int, prsEndDate: java.sql.Date,
                                 firstStrtDur:java.sql.Date,acDuration:Int) => {
      println("startDurationCalc===currentDsDate===" + currentDsDate + "===prevDsDate===" + prevDsDate +
        "===prevDuration===" + prevDuration +"===prsEndDate==="+prsEndDate+"===firstStrtDur=="+firstStrtDur+"===acDuration==="+acDuration)
      val prevDurStartDate = prevDsDate.toLocalDate.plusDays(prevDuration - 1)
      var derivedDsDate = java.sql.Date.valueOf(prevDurStartDate.plusDays(1))
      val accumulatedDSDate = java.sql.Date.valueOf(firstStrtDur.toLocalDate.plusDays(acDuration))

      if (derivedDsDate.before(accumulatedDSDate)) {
        derivedDsDate = accumulatedDSDate
      }

      if (derivedDsDate.after(prsEndDate)) {
        val derPrsEndDate = java.sql.Date.valueOf(prsEndDate.toLocalDate.plusDays(1))
        derPrsEndDate
      } else {
        if (currentDsDate.after(derivedDsDate)) {
          currentDsDate
        } else {
          derivedDsDate
        }
      }
    }: java.sql.Date).asNondeterministic()

这篇关于如何使用新列scala在内存转换中应用窗口函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆