填充火花数据框列中的缺失日期 [英] Filling missing dates in spark dataframe column

查看:26
本文介绍了填充火花数据框列中的缺失日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有列的 spark 数据框 - timestamp 类型的日期"和 long 类型的数量".对于每个日期,我都有一些数量价值.日期按升序排列.但是有一些日期丢失了.例如——当前 df -

I've a spark data frame with columns - "date" of type timestamp and "quantity" of type long. For each date, I've some value for quantity. The dates are sorted in increasing order. But there are some dates which are missing. For eg - Current df -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
14-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
20-09-2016  |    2

如您所见,df 有一些缺失的日期,例如 12-09-2016、13-09-2016 等.我想在数量字段中为那些缺失的日期输入 0,这样生成的 df 应该看起来像 -

As you can see, the df has some missing dates like 12-09-2016, 13-09-2016 etc. I want to put 0 in the quantity field for those missing dates such that resultant df should look like -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
12-09-2016  |    0
13-09-2016  |    0
14-09-2016  |    0
15-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
18-09-2016  |    0
19-09-2016  |    0
20-09-2016  |    2

对此的任何帮助/建议将不胜感激.提前致谢.请注意,我正在使用 Scala 进行编码.

Any help/suggestion regarding this will be appreciated. Thanks in advance. Note that I am coding in scala.

推荐答案

为了便于理解代码,我以有点冗长的方式编写了这个答案.可以优化.

I have written this answer in a bit verbose way for easy understanding of the code. It can be optimized.

需要导入

import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, TimestampType}

用于字符串到有效日期格式的 UDF

 val date_transform = udf((date: String) => {
    val dtFormatter = DateTimeFormatter.ofPattern("d-M-y")
    val dt = LocalDate.parse(date, dtFormatter)
    "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
      .replaceAll(" ", "0")
  })

以下 UDF 代码取自 迭代日期范围

Below UDF code taken from Iterate over dates range

  def fill_dates = udf((start: String, excludedDiff: Int) => {
    val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
    val fromDt = LocalDateTime.parse(start, dtFormatter)
    (1 to (excludedDiff - 1)).map(day => {
      val dt = fromDt.plusDays(day)
      "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
        .replaceAll(" ", "0")
    })
  })

设置示例数据帧 (df)

val df = Seq(
      ("10-09-2016", 1),
      ("11-09-2016", 2),
      ("14-09-2016", 0),
      ("16-09-2016", 1),
      ("17-09-2016", 0),
      ("20-09-2016", 2)).toDF("date", "quantity")
      .withColumn("date", date_transform($"date").cast(TimestampType))
      .withColumn("quantity", $"quantity".cast(LongType))

df.printSchema()
root
 |-- date: timestamp (nullable = true)
 |-- quantity: long (nullable = false)


df.show()    
+-------------------+--------+
|               date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00|       1|
|2016-09-11 00:00:00|       2|
|2016-09-14 00:00:00|       0|
|2016-09-16 00:00:00|       1|
|2016-09-17 00:00:00|       0|
|2016-09-20 00:00:00|       2|
+-------------------+--------+

df创建一个临时数据帧(tempDf)到union:

Create a temporary dataframe(tempDf) to union with df:

val w = Window.orderBy($"date")
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date"))
  .filter($"diff" > 1) // Pick date diff more than one day to generate our date
  .withColumn("next_dates", fill_dates($"date", $"diff"))
  .withColumn("quantity", lit("0"))
  .withColumn("date", explode($"next_dates"))
  .withColumn("date", $"date".cast(TimestampType))

tempDf.show(false)
+-------------------+--------+----+------------------------+
|date               |quantity|diff|next_dates              |
+-------------------+--------+----+------------------------+
|2016-09-12 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
|2016-09-13 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
|2016-09-15 00:00:00|0       |2   |[2016-09-15]            |
|2016-09-18 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
|2016-09-19 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
+-------------------+--------+----+------------------------+

现在联合两个数据帧

val result = df.union(tempDf.select("date", "quantity"))
  .orderBy("date")

result.show()
+-------------------+--------+
|               date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00|       1|
|2016-09-11 00:00:00|       2|
|2016-09-12 00:00:00|       0|
|2016-09-13 00:00:00|       0|
|2016-09-14 00:00:00|       0|
|2016-09-15 00:00:00|       0|
|2016-09-16 00:00:00|       1|
|2016-09-17 00:00:00|       0|
|2016-09-18 00:00:00|       0|
|2016-09-19 00:00:00|       0|
|2016-09-20 00:00:00|       2|
+-------------------+--------+

这篇关于填充火花数据框列中的缺失日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆