用DataFrame中的重复项填充空单元格 [英] Fill empty cells with duplicates in a DataFrame
问题描述
我有一个类似于以下表格:
I have a table similar to following:
+----------+----+--------------+-------------+
| Date|Hour| Weather|Precipitation|
+----------+----+--------------+-------------+
|2013-07-01| 0| null| null|
|2013-07-01| 3| null| null|
|2013-07-01| 6| clear|trace of p...|
|2013-07-01| 9| null| null|
|2013-07-01| 12| null| null|
|2013-07-01| 15| null| null|
|2013-07-01| 18| rain| null|
|2013-07-01| 21| null| null|
|2013-07-02| 0| null| null|
|2013-07-02| 3| null| null|
|2013-07-02| 6| rain|low precip...|
|2013-07-02| 9| null| null|
|2013-07-02| 12| null| null|
|2013-07-02| 15| null| null|
|2013-07-02| 18| null| null|
|2013-07-02| 21| null| null|
+----------+----+--------------+-------------+
该想法是分别在6小时和18小时以及6小时时用值填充天气
和降水
列.由于此表说明了 DataFrame
结构,因此进行简单的迭代似乎是不合理的.我尝试过这样的事情:
The idea is to fill columns Weather
and Precipitation
with values at 6 and 18 hours and at 6 hours respectfully. Since this table illustrates a DataFrame
structure, simple iteration through this seemes to be irrational.
I tried something like this:
//_weather stays for the table mentioned
def fillEmptyCells: Unit = {
val hourIndex = _weather.schema.fieldIndex("Hour")
val dateIndex = _weather.schema.fieldIndex("Date")
val weatherIndex = _weather.schema.fieldIndex("Weather")
val precipitationIndex = _weather.schema.fieldIndex("Precipitation")
val days = _weather.select("Date").distinct().rdd
days.foreach(x => {
val day = _weather.where("Date == $x(0)")
val dayValues = day.where("Hour == 6").first()
val weather = dayValues.getString(weatherIndex)
val precipitation = dayValues.getString(precipitationIndex)
day.rdd.map(y => (_(0), _(1), weather, precipitation))
})
}
但是,由于遍历 RDD
而不是以分布式方式对其进行处理,所以这段丑陋的代码似乎有点臭.它也必须从可能会出现问题的片段中组成一个新的 RDD
或 DataFrame
(我不知道该怎么做).有没有更优雅,更简单的方法来解决此任务?
However, this ugly piece of code seemes to smell because of iterating through an RDD
instead of handling it in a distributed manner. It also has to form a new RDD
or DataFrame
from pieces what can be problematic (I have no idea how to do this). Is there more elegant and simple way to solve this task?
推荐答案
假设您可以通过组合 Date
和 Hour 轻松创建
timestamp
列code>,我接下来要做的是:
Assuming that you can easily create a timestamp
column by combining Date
and Hour
, what I would do next is :
- 将此
timestamp
(可能以毫秒或秒为单位)转换为hourTimestamp
:.withColumn("hourTimestamp",$"timestamp"//3600 代码>)?
- 创建3列对应于不同的可能时差(3,6,9)
-
coalesce
这3列+原始的
- convert this
timestamp
(probably in milliseconds or seconds) into anhourTimestamp
:.withColumn("hourTimestamp", $"timestamp" // 3600
) ? - create 3 columns corresponding to the different possible hour lags (3,6,9)
coalesce
these 3 columns + the original one
以下是天气
的代码(对降水
进行相同的操作):
Here is the code for Weather
(do the same for Precipitation
):
val window = org.apache.spark.sql.expressions.Window.orderBy("hourTimestamp")
val weatherUpdate = df
.withColumn("WeatherLag1", lag("Weather", 3).over(window))
.withColumn("WeatherLag2", lag("Weather", 6).over(window))
.withColumn("WeatherLag3", lag("Weather", 9).over(window))
.withColumn("Weather",coalesce($"Weather",$"WeatherLag1",$"WeatherLag2",$"WeatherLag3"))
这篇关于用DataFrame中的重复项填充空单元格的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!