Pyspark:观察到pyspark数据框中的缺失值的插值 [英] Pyspark : Interpolation of missing values in pyspark dataframe observed

查看:509
本文介绍了Pyspark:观察到pyspark数据框中的缺失值的插值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用未完全填充且相当大的spark清理时间序列数据集.

I am trying to clean a time series dataset using spark that is not fully populated and fairly large.

我想做的就是像这样转换以下数据集

What I would like to do is convert the following dataset as such

Group | TS          |  Value
____________________________
A     | 01-01-2018  |  1
A     | 01-02-2018  |  2
A     | 01-03-2018  |  
A     | 01-04-2018  |  
A     | 01-05-2018  |  5
A     | 01-06-2018  |  
A     | 01-07-2018  |  10
A     | 01-08-2018  |  11

并将其转换为以下内容

Group | TS          |  Value>
____________________________
A     | 01-01-2018  |  1
A     | 01-02-2018  |  2
A     | 01-03-2018  |  3
A     | 01-04-2018  |  4
A     | 01-05-2018  |  5
A     | 01-06-2018  |  7.5
A     | 01-07-2018  |  10
A     | 01-08-2018  |  11

如果您能提供帮助,将不胜感激.

If you can help that would be greatly appreciated.

推荐答案

我已经实现了一个主要基于窗口函数的针对Spark 2.2的解决方案. 希望仍然可以帮助别人!

I have implemented a solution working for Spark 2.2, mainly based on window functions. Hope could still help someone other!

首先,让我们重新创建数据框:

First, let's recreate the dataframe:

from pyspark.sql import functions as F
from pyspark.sql import Window

data = [
    ("A","01-01-2018",1),
    ("A","01-02-2018",2),
    ("A","01-03-2018",None),
    ("A","01-04-2018",None),
    ("A","01-05-2018",5),
    ("A","01-06-2018",None),
    ("A","01-07-2018",10),
    ("A","01-08-2018",11)
]
df = spark.createDataFrame(data,['Group','TS','Value'])
df = df.withColumn('TS',F.unix_timestamp('TS','MM-dd-yyyy').cast('timestamp'))

现在,该功能:

def fill_linear_interpolation(df,id_cols,order_col,value_col):
    """ 
    Apply linear interpolation to dataframe to fill gaps. 

    :param df: spark dataframe
    :param id_cols: string or list of column names to partition by the window function 
    :param order_col: column to use to order by the window function
    :param value_col: column to be filled

    :returns: spark dataframe updated with interpolated values
    """
    # create row number over window and a column with row number only for non missing values
    w = Window.partitionBy(id_cols).orderBy(order_col)
    new_df = new_df.withColumn('rn',F.row_number().over(w))
    new_df = new_df.withColumn('rn_not_null',F.when(F.col(value_col).isNotNull(),F.col('rn')))

    # create relative references to the start value (last value not missing)
    w_start = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(Window.unboundedPreceding,-1)
    new_df = new_df.withColumn('start_val',F.last(value_col,True).over(w_start))
    new_df = new_df.withColumn('start_rn',F.last('rn_not_null',True).over(w_start))

    # create relative references to the end value (first value not missing)
    w_end = Window.partitionBy(id_cols).orderBy(order_col).rowsBetween(0,Window.unboundedFollowing)
    new_df = new_df.withColumn('end_val',F.first(value_col,True).over(w_end))
    new_df = new_df.withColumn('end_rn',F.first('rn_not_null',True).over(w_end))

    # create references to gap length and current gap position  
    new_df = new_df.withColumn('diff_rn',F.col('end_rn')-F.col('start_rn'))
    new_df = new_df.withColumn('curr_rn',F.col('diff_rn')-(F.col('end_rn')-F.col('rn')))

    # calculate linear interpolation value
    lin_interp_func = (F.col('start_val')+(F.col('end_val')-F.col('start_val'))/F.col('diff_rn')*F.col('curr_rn'))
    new_df = new_df.withColumn(value_col,F.when(F.col(value_col).isNull(),lin_interp_func).otherwise(F.col(value_col)))

    keep_cols = id_cols + [order_col,value_col]
    new_df = new_df.select(keep_cols)
    return new_df

最后:

new_df = fill_linear_interpolation(df=df,id_cols='Group',order_col='TS',value_col='Value')
#+-----+-------------------+-----+
#|Group|                 TS|Value|
#+-----+-------------------+-----+
#|    A|2018-01-01 00:00:00|  1.0|
#|    A|2018-01-02 00:00:00|  2.0|
#|    A|2018-01-03 00:00:00|  3.0|
#|    A|2018-01-04 00:00:00|  4.0|
#|    A|2018-01-05 00:00:00|  5.0|
#|    A|2018-01-06 00:00:00|  7.5|
#|    A|2018-01-07 00:00:00| 10.0|
#|    A|2018-01-08 00:00:00| 11.0|
#+-----+-------------------+-----+

这篇关于Pyspark:观察到pyspark数据框中的缺失值的插值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆