在Spark / Python中向前填充缺失的值 [英] Forward fill missing values in Spark/Python

查看:812
本文介绍了在Spark / Python中向前填充缺失的值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图在Spark数据框中填入缺少的值和前面的非空值(如果存在)。我在Python / Pandas中完成了这种类型的工作,但是我的数据对于Pandas来说太大了(在一个小群集上),我是Spark noob。 Spark能做些什么吗?它可以做多列吗?如果是这样,怎么样?如果没有,对于Hadoop工具套件中的其他方法有何建议?

谢谢!

解决方案

我找到了一个解决方案,无需额外的编码就可以使用Window 此处。所以 Jeff 是对的,有一个解决方案。完整的代码boelow,我将简要解释它的功能,更多详细信息请看博客。

  from pyspark.sql从pyspark.sql.functions导入窗口
import last
import sys

#定义窗口
window = Window.orderBy('time')\
.rowsBetween(-sys.maxsize,0)

#定义正向填充列
filled_column_temperature = last(df6 ['temperature'],ignorenulls = True).over(window )


#填充
spark_df_filled = df6.withColumn('temperature_filled',filled_column_temperature)

所以这个想法是定义一个Window滑动窗口(更多关于滑动窗口这里)通过数据总是包含实际行和所有以前的数据:

  window = Window。 orderBy('time')\ 
.rowsBetween(-sys.maxsize,0)

请注意,我们按时间排序,因此数据的顺序是正确的。还要注意,使用-sys.maxsize可以确保窗口始终包含所有以前的数据,并且随着数据自上而下遍历数据而不断增长,但可能会有更高效的解决方案。



使用last函数,我们总是处理该窗口中的最后一行。通过传递ignorenulls = True,我们定义如果当前行为null,那么函数将返回窗口中最近(最后一个)非空值。否则,将使用实际的行值。



完成。


I am attempting to fill in missing values in my Spark dataframe with the previous non-null value (if it exists). I've done this type of thing in Python/Pandas but my data is too big for Pandas (on a small cluster) and I'm Spark noob. Is this something Spark can do? Can it do it for multiple columns? If so, how? If not, any suggestions for alternative approaches within the who Hadoop suite of tools?

Thanks!

解决方案

I've found a solution that works without additional coding by using a Window here. So Jeff was right, there is a solution. full code boelow, I'll briefly explain what it does, for more details just look at the blog.

from pyspark.sql import Window
from pyspark.sql.functions import last
import sys

# define the window
window = Window.orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column_temperature = last(df6['temperature'], ignorenulls=True).over(window)


# do the fill 
spark_df_filled = df6.withColumn('temperature_filled', filled_column_temperature)

So the idea is to define a Window sliding (more on sliding windows here) through the data which always contains the actual row and ALL previous ones:

    window = Window.orderBy('time')\
           .rowsBetween(-sys.maxsize, 0)

Note that we sort by time, so data is in the correct order. Also note that using "-sys.maxsize" ensures that the window is always including all previous data and is contineously growing as it traverses through the data top-down, but there might be more efficient solutions.

Using the "last" function, we are always addressing the last row in that window. By passing "ignorenulls=True" we define that if the current row is null, then the function will return the most recent (last) non-null value in the window. Otherwise the actual row's value is used.

Done.

这篇关于在Spark / Python中向前填充缺失的值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆