向前填充新行以说明缺少的日期 [英] Forward Fill New Row to Account for Missing Dates

查看:26
本文介绍了向前填充新行以说明缺少的日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前有一个数据集,由变量聚合器"分组为每小时增量.这个每小时数据存在差距,我理想情况下想要做的是用映射到 x 列中变量的前一行向前填充行.

I currently have a dataset grouped into hourly increments by a variable "aggregator". There are gaps in this hourly data and what i would ideally like to do is forward fill the rows with the prior row which maps to the variable in column x.

我已经看到一些使用 PANDAS 解决类似问题的解决方案,但理想情况下,我想了解如何最好地使用 pyspark UDF 来解决这个问题.

I've seen some solutions to similar problems using PANDAS but ideally i would like to understand how best to approach this with a pyspark UDF.

我最初在 PANDAS 中考虑过类似以下的事情,但也很难实现这一点,只是将忽略聚合器作为第一遍:

I'd initially thought about something like the following with PANDAS but also struggled to implement this to just fill ignoring the aggregator as a first pass:

df = df.set_index(keys=[df.timestamp]).resample('1H', fill_method='ffill')

但理想情况下,我想避免使用 PANDAS.

But ideally i'd like to avoid using PANDAS.

在下面的示例中,我缺少两行每小时数据(标记为 MISSING).

In the example below i have two missing rows of hourly data (labeled as MISSING).

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| MISSING              | MISSING    |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| MISSING              | MISSING    |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

此处的预期输出如下:

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| 2018-12-27T11:00:00Z | A          |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| 2018-12-27T12:00:00Z | B          |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

感谢您的帮助.

谢谢.

推荐答案

这是填补缺失时间的解决方案.使用 windows、lag 和 udf.只需稍加修改,它也可以延长至数天.

Here is the solution, to fill the missing hours. using windows, lag and udf. With little modification it can extend to days as well.

from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dateutil.relativedelta import relativedelta

def missing_hours(t1, t2):
    return [t1 + relativedelta(hours=-x) for x in range(1, t1.hour-t2.hour)]

missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))

df = spark.read.csv('dates.csv',header=True,inferSchema=True)

window = Window.partitionBy("aggregator").orderBy("timestamp")

df_mising = df.withColumn("prev_timestamp",lag(col("timestamp"),1, None).over(window))\
       .filter(col("prev_timestamp").isNotNull())\
       .withColumn("timestamp", explode(missing_hours_udf(col("timestamp"), col("prev_timestamp"))))\
       .drop("prev_timestamp")

df.union(df_mising).orderBy("aggregator","timestamp").show()

结果

+-------------------+----------+
|          timestamp|aggregator|
+-------------------+----------+
|2018-12-27 09:00:00|         A|
|2018-12-27 10:00:00|         A|
|2018-12-27 11:00:00|         A|
|2018-12-27 12:00:00|         A|
|2018-12-27 13:00:00|         A|
|2018-12-27 09:00:00|         B|
|2018-12-27 10:00:00|         B|
|2018-12-27 11:00:00|         B|
|2018-12-27 12:00:00|         B|
|2018-12-27 13:00:00|         B|
|2018-12-27 14:00:00|         B|
+-------------------+----------+

这篇关于向前填充新行以说明缺少的日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆