填充火花数据框列中的缺失日期 [英] Filling missing dates in spark dataframe column
问题描述
我有一个带有列的 spark 数据框 - timestamp
类型的日期"和 long
类型的数量".对于每个日期,我都有一些数量价值.日期按升序排列.但是有一些日期丢失了.例如——当前 df -
I've a spark data frame with columns - "date" of type timestamp
and "quantity" of type long
. For each date, I've some value for quantity. The dates are sorted in increasing order. But there are some dates which are missing.
For eg -
Current df -
Date | Quantity
10-09-2016 | 1
11-09-2016 | 2
14-09-2016 | 0
16-09-2016 | 1
17-09-2016 | 0
20-09-2016 | 2
如您所见,df 有一些缺失的日期,例如 12-09-2016、13-09-2016 等.我想在数量字段中为那些缺失的日期输入 0,这样生成的 df 应该看起来像 -
As you can see, the df has some missing dates like 12-09-2016, 13-09-2016 etc. I want to put 0 in the quantity field for those missing dates such that resultant df should look like -
Date | Quantity
10-09-2016 | 1
11-09-2016 | 2
12-09-2016 | 0
13-09-2016 | 0
14-09-2016 | 0
15-09-2016 | 0
16-09-2016 | 1
17-09-2016 | 0
18-09-2016 | 0
19-09-2016 | 0
20-09-2016 | 2
对此的任何帮助/建议将不胜感激.提前致谢.请注意,我正在使用 Scala 进行编码.
Any help/suggestion regarding this will be appreciated. Thanks in advance. Note that I am coding in scala.
推荐答案
为了便于理解代码,我以有点冗长的方式编写了这个答案.可以优化.
I have written this answer in a bit verbose way for easy understanding of the code. It can be optimized.
需要导入
import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, TimestampType}
用于字符串到有效日期格式的 UDF
val date_transform = udf((date: String) => {
val dtFormatter = DateTimeFormatter.ofPattern("d-M-y")
val dt = LocalDate.parse(date, dtFormatter)
"%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
.replaceAll(" ", "0")
})
以下 UDF 代码取自 迭代日期范围
Below UDF code taken from Iterate over dates range
def fill_dates = udf((start: String, excludedDiff: Int) => {
val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val fromDt = LocalDateTime.parse(start, dtFormatter)
(1 to (excludedDiff - 1)).map(day => {
val dt = fromDt.plusDays(day)
"%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
.replaceAll(" ", "0")
})
})
设置示例数据帧 (df
)
val df = Seq(
("10-09-2016", 1),
("11-09-2016", 2),
("14-09-2016", 0),
("16-09-2016", 1),
("17-09-2016", 0),
("20-09-2016", 2)).toDF("date", "quantity")
.withColumn("date", date_transform($"date").cast(TimestampType))
.withColumn("quantity", $"quantity".cast(LongType))
df.printSchema()
root
|-- date: timestamp (nullable = true)
|-- quantity: long (nullable = false)
df.show()
+-------------------+--------+
| date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00| 1|
|2016-09-11 00:00:00| 2|
|2016-09-14 00:00:00| 0|
|2016-09-16 00:00:00| 1|
|2016-09-17 00:00:00| 0|
|2016-09-20 00:00:00| 2|
+-------------------+--------+
用df
创建一个临时数据帧(tempDf
)到union
:
Create a temporary dataframe(tempDf
) to union
with df
:
val w = Window.orderBy($"date")
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date"))
.filter($"diff" > 1) // Pick date diff more than one day to generate our date
.withColumn("next_dates", fill_dates($"date", $"diff"))
.withColumn("quantity", lit("0"))
.withColumn("date", explode($"next_dates"))
.withColumn("date", $"date".cast(TimestampType))
tempDf.show(false)
+-------------------+--------+----+------------------------+
|date |quantity|diff|next_dates |
+-------------------+--------+----+------------------------+
|2016-09-12 00:00:00|0 |3 |[2016-09-12, 2016-09-13]|
|2016-09-13 00:00:00|0 |3 |[2016-09-12, 2016-09-13]|
|2016-09-15 00:00:00|0 |2 |[2016-09-15] |
|2016-09-18 00:00:00|0 |3 |[2016-09-18, 2016-09-19]|
|2016-09-19 00:00:00|0 |3 |[2016-09-18, 2016-09-19]|
+-------------------+--------+----+------------------------+
现在联合两个数据帧
val result = df.union(tempDf.select("date", "quantity"))
.orderBy("date")
result.show()
+-------------------+--------+
| date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00| 1|
|2016-09-11 00:00:00| 2|
|2016-09-12 00:00:00| 0|
|2016-09-13 00:00:00| 0|
|2016-09-14 00:00:00| 0|
|2016-09-15 00:00:00| 0|
|2016-09-16 00:00:00| 1|
|2016-09-17 00:00:00| 0|
|2016-09-18 00:00:00| 0|
|2016-09-19 00:00:00| 0|
|2016-09-20 00:00:00| 2|
+-------------------+--------+
这篇关于填充火花数据框列中的缺失日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!