在Spark DataFrame列中填充缺少的日期 [英] Filling missing dates in spark dataframe column

查看:464
本文介绍了在Spark DataFrame列中填充缺少的日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个带有列的Spark数据框-类型为timestamp的日期"和类型为long的数量".对于每个日期,我都有一些价值.日期按升序排序.但是,有些日期缺少. 例如- 当前df-

I've a spark data frame with columns - "date" of type timestamp and "quantity" of type long. For each date, I've some value for quantity. The dates are sorted in increasing order. But there are some dates which are missing. For eg - Current df -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
14-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
20-09-2016  |    2

如您所见,df缺少一些日期,例如12-09-2016、13-09-2016等.我想在数量字段中输入0来填写那些缺失的日期,以使结果df看起来像--

As you can see, the df has some missing dates like 12-09-2016, 13-09-2016 etc. I want to put 0 in the quantity field for those missing dates such that resultant df should look like -

Date        |    Quantity
10-09-2016  |    1
11-09-2016  |    2
12-09-2016  |    0
13-09-2016  |    0
14-09-2016  |    0
15-09-2016  |    0
16-09-2016  |    1
17-09-2016  |    0
18-09-2016  |    0
19-09-2016  |    0
20-09-2016  |    2

任何对此的帮助/建议将不胜感激.提前致谢. 请注意,我正在使用Scala进行编码.

Any help/suggestion regarding this will be appreciated. Thanks in advance. Note that I am coding in scala.

推荐答案

为了方便理解代码,我以冗长的方式编写了此答案.可以优化.

I have written this answer in bit verbose way for easy understanding of the code. It can be optimized.

需要进口

import java.time.format.DateTimeFormatter
import java.time.{LocalDate, LocalDateTime}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, TimestampType}

将字符串转换为有效日期格式的UDF

 val date_transform = udf((date: String) => {
    val dtFormatter = DateTimeFormatter.ofPattern("d-M-y")
    val dt = LocalDate.parse(date, dtFormatter)
    "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
      .replaceAll(" ", "0")
  })

迭代日期范围内的UDF代码下方

  def fill_dates = udf((start: String, excludedDiff: Int) => {
    val dtFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
    val fromDt = LocalDateTime.parse(start, dtFormatter)
    (1 to (excludedDiff - 1)).map(day => {
      val dt = fromDt.plusDays(day)
      "%4d-%2d-%2d".format(dt.getYear, dt.getMonthValue, dt.getDayOfMonth)
        .replaceAll(" ", "0")
    })
  })

设置示例数据框(df)

Setting up sample dataframe (df)

val df = Seq(
      ("10-09-2016", 1),
      ("11-09-2016", 2),
      ("14-09-2016", 0),
      ("16-09-2016", 1),
      ("17-09-2016", 0),
      ("20-09-2016", 2)).toDF("date", "quantity")
      .withColumn("date", date_transform($"date").cast(TimestampType))
      .withColumn("quantity", $"quantity".cast(LongType))

df.printSchema()
root
 |-- date: timestamp (nullable = true)
 |-- quantity: long (nullable = false)


df.show()    
+-------------------+--------+
|               date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00|       1|
|2016-09-11 00:00:00|       2|
|2016-09-14 00:00:00|       0|
|2016-09-16 00:00:00|       1|
|2016-09-17 00:00:00|       0|
|2016-09-20 00:00:00|       2|
+-------------------+--------+

使用dfunion创建一个临时数据框(tempDf):

Create a temporary dataframe(tempDf) to union with df:

val w = Window.orderBy($"date")
val tempDf = df.withColumn("diff", datediff(lead($"date", 1).over(w), $"date"))
  .filter($"diff" > 1) // Pick date diff more than one day to generate our date
  .withColumn("next_dates", fill_dates($"date", $"diff"))
  .withColumn("quantity", lit("0"))
  .withColumn("date", explode($"next_dates"))
  .withColumn("date", $"date".cast(TimestampType))

tempDf.show(false)
+-------------------+--------+----+------------------------+
|date               |quantity|diff|next_dates              |
+-------------------+--------+----+------------------------+
|2016-09-12 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
|2016-09-13 00:00:00|0       |3   |[2016-09-12, 2016-09-13]|
|2016-09-15 00:00:00|0       |2   |[2016-09-15]            |
|2016-09-18 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
|2016-09-19 00:00:00|0       |3   |[2016-09-18, 2016-09-19]|
+-------------------+--------+----+------------------------+

现在合并两个数据框

val result = df.union(tempDf.select("date", "quantity"))
  .orderBy("date")

result.show()
+-------------------+--------+
|               date|quantity|
+-------------------+--------+
|2016-09-10 00:00:00|       1|
|2016-09-11 00:00:00|       2|
|2016-09-12 00:00:00|       0|
|2016-09-13 00:00:00|       0|
|2016-09-14 00:00:00|       0|
|2016-09-15 00:00:00|       0|
|2016-09-16 00:00:00|       1|
|2016-09-17 00:00:00|       0|
|2016-09-18 00:00:00|       0|
|2016-09-19 00:00:00|       0|
|2016-09-20 00:00:00|       2|
+-------------------+--------+

这篇关于在Spark DataFrame列中填充缺少的日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆