转发填充新行到缺少日期的帐户 [英] Forward Fill New Row to Account for Missing Dates

查看:47
本文介绍了转发填充新行到缺少日期的帐户的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前有一个数据集,该数据集由变量聚合器"按小时增量分组.每小时数据中存在间隙,我理想地要做的是用映射到x列中的变量的前一行向前填充行.

I currently have a dataset grouped into hourly increments by a variable "aggregator". There are gaps in this hourly data and what i would ideally like to do is forward fill the rows with the prior row which maps to the variable in column x.

我已经看到了使用PANDAS解决类似问题的一些解决方案,但理想情况下,我想了解如何最好地使用pyspark UDF解决此问题.

I've seen some solutions to similar problems using PANDAS but ideally i would like to understand how best to approach this with a pyspark UDF.

我最初考虑使用PANDAS进行以下操作,但是也很难实现这一点,以至于忽略了作为第一步的聚合器:

I'd initially thought about something like the following with PANDAS but also struggled to implement this to just fill ignoring the aggregator as a first pass:

df = df.set_index(keys=[df.timestamp]).resample('1H', fill_method='ffill')

但理想情况下,我想避免使用PANDAS.

But ideally i'd like to avoid using PANDAS.

在下面的示例中,我缺少两个小时数据行(标记为MISSING).

In the example below i have two missing rows of hourly data (labeled as MISSING).

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| MISSING              | MISSING    |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| MISSING              | MISSING    |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

此处的预期输出如下:

| timestamp            | aggregator |
|----------------------|------------|
| 2018-12-27T09:00:00Z | A          |
| 2018-12-27T10:00:00Z | A          |
| 2018-12-27T11:00:00Z | A          |
| 2018-12-27T12:00:00Z | A          |
| 2018-12-27T13:00:00Z | A          |
| 2018-12-27T09:00:00Z | B          |
| 2018-12-27T10:00:00Z | B          |
| 2018-12-27T11:00:00Z | B          |
| 2018-12-27T12:00:00Z | B          |
| 2018-12-27T13:00:00Z | B          |
| 2018-12-27T14:00:00Z | B          |

感谢帮助.

谢谢.

推荐答案

以下是解决方案,可以弥补丢失的小时数.使用Windows,lag和udf.只需少量修改,它就可以延长到几天.

Here is the solution, to fill the missing hours. using windows, lag and udf. With little modification it can extend to days as well.

from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dateutil.relativedelta import relativedelta

def missing_hours(t1, t2):
    return [t1 + relativedelta(hours=-x) for x in range(1, t1.hour-t2.hour)]

missing_hours_udf = udf(missing_hours, ArrayType(TimestampType()))

df = spark.read.csv('dates.csv',header=True,inferSchema=True)

window = Window.partitionBy("aggregator").orderBy("timestamp")

df_mising = df.withColumn("prev_timestamp",lag(col("timestamp"),1, None).over(window))\
       .filter(col("prev_timestamp").isNotNull())\
       .withColumn("timestamp", explode(missing_hours_udf(col("timestamp"), col("prev_timestamp"))))\
       .drop("prev_timestamp")

df.union(df_mising).orderBy("aggregator","timestamp").show()

结果

+-------------------+----------+
|          timestamp|aggregator|
+-------------------+----------+
|2018-12-27 09:00:00|         A|
|2018-12-27 10:00:00|         A|
|2018-12-27 11:00:00|         A|
|2018-12-27 12:00:00|         A|
|2018-12-27 13:00:00|         A|
|2018-12-27 09:00:00|         B|
|2018-12-27 10:00:00|         B|
|2018-12-27 11:00:00|         B|
|2018-12-27 12:00:00|         B|
|2018-12-27 13:00:00|         B|
|2018-12-27 14:00:00|         B|
+-------------------+----------+

这篇关于转发填充新行到缺少日期的帐户的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆