如何在新列scala的内存转换中应用窗口函数 [英] how to apply window function in memory transformation with new column scala
本文介绍了如何在新列scala的内存转换中应用窗口函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有一个要转换为以下输出的数据帧,其中每行start_duration和end_duration将由上一行start_duration和end_duration产生,请让我知道如何使用scala在spark中实现.
I have a dataframe which I want to transform as below output where each row start_duration and end_duration will be resulted from previous row start_duration and end_duration, please let me know how to achieve it in spark using scala.
下面是计算start_duration和end_duration的公式:
Below is the formulae to calculate start_duration and end_duration :
start_duration = max(previous end_duration + 1, current date);
end_duration = min(presciption_end date, start_duration + duration – 1)
下面是我的输入数据框:
Below is my input data frame :
+--------
--------+-----------+---------+-----------+----------------+----------+--------+----------+----------+
|prescription_uid|patient_uid|ndc |label |dispensation_uid|date |duration|start_date|end_date |
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+
|0 |0 |16714-128|sinvastatin|0 |2015-06-10|30 |2015-06-01|2015-12-01|
|0 |0 |16714-128|sinvastatin|1 |2015-07-15|30 |2015-06-01|2015-12-01|
|0 |0 |16714-128|sinvastatin|2 |2015-08-01|30 |2015-06-01|2015-12-01|
|0 |0 |16714-128|sinvastatin|3 |2015-10-01|30 |2015-06-01|2015-12-01|
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+
预期的输出数据帧:
EXPECTED RESULT:
+--------
--------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
|prescription_uid|patient_uid|ndc |label |dispensation_uid|date |duration|start_date|end_date |first_start_duration|first_end_duration|start_duration|end_duration|
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
|0 |0 |16714-128|sinvastatin|0 |2015-06-10|30 |2015-06-01|2015-12-01|2015-06-10 |2015-07-09 |2015-06-10 |2015-07-09 |
|0 |0 |16714-128|sinvastatin|1 |2015-07-15|30 |2015-06-01|2015-12-01|2015-06-10 |2015-07-09 |2015-07-15 |2015-08-13 |
|0 |0 |16714-128|sinvastatin|2 |2015-08-01|30 |2015-06-01|2015-12-01|2015-06-10 |2015-07-09 |2015-08-14 |2015-09-13 |
|0 |0 |16714-128|sinvastatin|3 |2015-10-01|30 |2015-06-01|2015-12-01|2015-06-10 |2015-07-09 |2015-10-01 |2015-10-30 |
+----------------+-----------+---------+-----------+----------------+----------+--------+----------+----------+--------------------+------------------+--------------+------------+
Code tried :
val windowByPatient = Window.partitionBy($"patient_uid").orderBy($"date")
val windowByPatientBeforeCurrentRow = windowByPatient.rowsBetween(Window.unboundedPreceding, -1)
joinedPrDF = joinedPrDF
.withColumn("first_start_duration", firstStartDuration(first($"date").over(windowByPatient), $"start_date"))
.withColumn("first_end_duration", firstEndDuration($"first_start_duration", $"end_date", $"duration"))
.withColumn("start_duration", when(count("*").over(windowByPatient) === 1, $"first_start_duration")
.otherwise(startDurationCalc($"first_start_duration", $"date", $"start_date", coalesce(sum($"duration").over(windowByPatientBeforeCurrentRow), lit("0")))))
.withColumn("end_duration", when(count("*").over(windowByPatient) === 1, $"first_end_duration")
.otherwise(endDurationCalc($"end_date", $"start_duration", $"duration")))
UDF:
val startDurationCalc = udf( (firstStrtDur:java.sql.Date, currentDsDate:java.sql.Date,
prsStartDate:java.sql.Date,duration:Int) => {
println("==="+firstStrtDur+"==="+currentDsDate +"==="+prsStartDate +"==="+duration )
var startDate = java.sql.Date.valueOf(firstStrtDur.toLocalDate.plusDays(duration))
if (startDate.after(currentDsDate)) {
startDate
} else {
currentDsDate
}
} : java.sql.Date)
val endDurationCalc = udf( (prsEndDate:java.sql.Date, startDuration:java.sql.Date,duration:Int) => {
println("endDateCalcContRow ==="+prsEndDate+"==="+startDuration +"==="+duration )
val currEndDate = java.sql.Date.valueOf(startDuration.toLocalDate.plusDays(duration-1))
if (currEndDate.before(prsEndDate)) {
currEndDate
} else {
prsEndDate
}
} : java.sql.Date)
推荐答案
以下是在上一个持续时间和上一个分配日期使用滞后窗口功能的最终开始时间计算器:
Below is the final start duration calculator using lag window function on previous duration and previous dispensation date :
val startDurationCalc = udf((currentDsDate: java.sql.Date, prevDsDate: java.sql.Date, prevDuration: Int, prsEndDate: java.sql.Date,
firstStrtDur:java.sql.Date,acDuration:Int) => {
println("startDurationCalc===currentDsDate===" + currentDsDate + "===prevDsDate===" + prevDsDate +
"===prevDuration===" + prevDuration +"===prsEndDate==="+prsEndDate+"===firstStrtDur=="+firstStrtDur+"===acDuration==="+acDuration)
val prevDurStartDate = prevDsDate.toLocalDate.plusDays(prevDuration - 1)
var derivedDsDate = java.sql.Date.valueOf(prevDurStartDate.plusDays(1))
val accumulatedDSDate = java.sql.Date.valueOf(firstStrtDur.toLocalDate.plusDays(acDuration))
if (derivedDsDate.before(accumulatedDSDate)) {
derivedDsDate = accumulatedDSDate
}
if (derivedDsDate.after(prsEndDate)) {
val derPrsEndDate = java.sql.Date.valueOf(prsEndDate.toLocalDate.plusDays(1))
derPrsEndDate
} else {
if (currentDsDate.after(derivedDsDate)) {
currentDsDate
} else {
derivedDsDate
}
}
}: java.sql.Date).asNondeterministic()
这篇关于如何在新列scala的内存转换中应用窗口函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文