用DataFrame中的重复项填充空单元格 [英] Fill empty cells with duplicates in a DataFrame

查看:66
本文介绍了用DataFrame中的重复项填充空单元格的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个类似于以下表格:

I have a table similar to following:

    +----------+----+--------------+-------------+
    |      Date|Hour|       Weather|Precipitation|
    +----------+----+--------------+-------------+
    |2013-07-01|   0|          null|         null|
    |2013-07-01|   3|          null|         null|
    |2013-07-01|   6|         clear|trace of p...|
    |2013-07-01|   9|          null|         null|
    |2013-07-01|  12|          null|         null|
    |2013-07-01|  15|          null|         null|
    |2013-07-01|  18|          rain|         null|
    |2013-07-01|  21|          null|         null|
    |2013-07-02|   0|          null|         null|
    |2013-07-02|   3|          null|         null|
    |2013-07-02|   6|          rain|low precip...|
    |2013-07-02|   9|          null|         null|
    |2013-07-02|  12|          null|         null|
    |2013-07-02|  15|          null|         null|
    |2013-07-02|  18|          null|         null|
    |2013-07-02|  21|          null|         null|
    +----------+----+--------------+-------------+

该想法是分别在6小时和18小时以及6小时时用值填充天气降水列.由于此表说明了 DataFrame 结构,因此进行简单的迭代似乎是不合理的.我尝试过这样的事情:

The idea is to fill columns Weather and Precipitation with values at 6 and 18 hours and at 6 hours respectfully. Since this table illustrates a DataFrame structure, simple iteration through this seemes to be irrational. I tried something like this:

//_weather stays for the table mentioned
def fillEmptyCells: Unit = {
    val hourIndex = _weather.schema.fieldIndex("Hour")
    val dateIndex = _weather.schema.fieldIndex("Date")
    val weatherIndex = _weather.schema.fieldIndex("Weather")
    val precipitationIndex = _weather.schema.fieldIndex("Precipitation")

    val days = _weather.select("Date").distinct().rdd
    days.foreach(x => {
      val day = _weather.where("Date == $x(0)")
      val dayValues = day.where("Hour == 6").first()
      val weather = dayValues.getString(weatherIndex)
      val precipitation = dayValues.getString(precipitationIndex)
      day.rdd.map(y => (_(0), _(1), weather, precipitation))
    })
  }

但是,由于遍历 RDD 而不是以分布式方式对其进行处理,所以这段丑陋的代码似乎有点臭.它也必须从可能会出现问题的片段中组成一个新的 RDD DataFrame (我不知道该怎么做).有没有更优雅,更简单的方法来解决此任务?

However, this ugly piece of code seemes to smell because of iterating through an RDD instead of handling it in a distributed manner. It also has to form a new RDD or DataFrame from pieces what can be problematic (I have no idea how to do this). Is there more elegant and simple way to solve this task?

推荐答案

假设您可以通过组合 Date Hour 轻松创建 timestamp 列code>,我接下来要做的是:

Assuming that you can easily create a timestamp column by combining Date and Hour, what I would do next is :

  1. 将此 timestamp (可能以毫秒或秒为单位)转换为 hourTimestamp : .withColumn("hourTimestamp",$"timestamp"//3600 )?
  2. 创建3列对应于不同的可能时差(3,6,9)
  3. coalesce 这3列+原始的
  1. convert this timestamp (probably in milliseconds or seconds) into an hourTimestamp : .withColumn("hourTimestamp", $"timestamp" // 3600) ?
  2. create 3 columns corresponding to the different possible hour lags (3,6,9)
  3. coalesce these 3 columns + the original one

以下是天气的代码(对降水进行相同的操作):

Here is the code for Weather (do the same for Precipitation):

val window = org.apache.spark.sql.expressions.Window.orderBy("hourTimestamp")
val weatherUpdate = df
                    .withColumn("WeatherLag1", lag("Weather", 3).over(window))
                    .withColumn("WeatherLag2", lag("Weather", 6).over(window))
                    .withColumn("WeatherLag3", lag("Weather", 9).over(window))
                    .withColumn("Weather",coalesce($"Weather",$"WeatherLag1",$"WeatherLag2",$"WeatherLag3"))

这篇关于用DataFrame中的重复项填充空单元格的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆