Pyspark 在列级别内向前和向后填充 [英] Pyspark forward and backward fill within column level

查看:100
本文介绍了Pyspark 在列级别内向前和向后填充的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在 pyspark 数据框中填充缺失的数据.pyspark 数据框如下所示:

I try to fill missing data in a pyspark dataframe. The pyspark dataframe looks as such:

+---------+---------+-------------------+----+
| latitude|longitude|      timestamplast|name|
+---------+---------+-------------------+----+
|         | 4.905615|2019-08-01 00:00:00|   1|
|51.819645|         |2019-08-01 00:00:00|   1|
| 51.81964| 4.961713|2019-08-01 00:00:00|   2|
|         |         |2019-08-01 00:00:00|   3|
| 51.82918| 4.911187|                   |   3|
| 51.82385| 4.901488|2019-08-01 00:00:03|   5|
+---------+---------+-------------------+----+

在名称"列中,我想向前填充或向后填充(以必要为准)以仅填充纬度"和经度"(不应填充时间戳").我该怎么做?

Within the column "name" I want to either forward fill or backward fill (whichever is necessary) to fill only "latitude" and "longitude" ("timestamplast" should not be filled). How do I do this?

输出将是:

+---------+---------+-------------------+----+
| latitude|longitude|      timestamplast|name|
+---------+---------+-------------------+----+
|51.819645| 4.905615|2019-08-01 00:00:00|   1|
|51.819645| 4.905615|2019-08-01 00:00:00|   1|
| 51.81964| 4.961713|2019-08-01 00:00:00|   2|
| 51.82918| 4.911187|2019-08-01 00:00:00|   3|
| 51.82918| 4.911187|                   |   3|
| 51.82385| 4.901488|2019-08-01 00:00:03|   5|
+---------+---------+-------------------+----+

在 Pandas 中,这将是这样完成的:

In Pandas this would be done as such:

df = df.groupby("name")['longitude','latitude'].apply(lambda x : x.ffill().bfill())

这在 Pyspark 中如何实现?

How would this be done in Pyspark?

推荐答案

我建议你使用以下两个 Window Specs:

I suggest you use the following two Window Specs:

from pyspark.sql import Window
w1 = Window.partitionBy('name').orderBy('timestamplast')
w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

地点:

  1. w1 是我们用来计算前向填充的常规 WinSpec,与以下内容相同:

  1. w1 is the regular WinSpec we use to calculate the forward-fill which is the same as the following:

w1 = Window.partitionBy('name').orderBy('timestamplast').rowsBetween(Window.unboundedPreceding,0)

请参阅 文档默认窗口框架:

see the following note from the documentation for default window frames:

注意:当未定义排序时,默认使用无界窗口框架(rowFrame、unboundedPreceding、unboundedFollowing).定义排序时,默认使用不断增长的窗口框架(rangeFrame、unboundedPreceding、currentRow).

Note: When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.

  • ffill之后,如果存在的话,我们只需要修复最前面的空值,这样我们就可以使用一个固定的Window框架(在Window.unboundedPreceding和Window.unboundedFollowing之间),这比使用正在运行的 Window 框架更有效,因为它只需要一个聚合,请参阅 SPARK-8638

  • after ffill, we only need to fix the null values at the very front if exists, so we can use a fixed Window frame(Between Window.unboundedPreceding and Window.unboundedFollowing), this is more efficient than using a running Window frame since it requires only one aggregate, see SPARK-8638

    然后可以使用 coalesce + last + first 基于以上两个WindowSpecs:

    Then the x.ffill().bfill() can be handled by using coalesce + last + first based on the above two WindowSpecs:

    from pyspark.sql.functions import coalesce, last, first
    
    df.withColumn('latitude_new', coalesce(last('latitude',True).over(w1), first('latitude',True).over(w2))) \
      .select('name','timestamplast', 'latitude','latitude_new') \
      .show()
    +----+-------------------+---------+------------+
    |name|      timestamplast| latitude|latitude_new|
    +----+-------------------+---------+------------+
    |   1|2019-08-01 00:00:00|     null|   51.819645|
    |   1|2019-08-01 00:00:01|     null|   51.819645|
    |   1|2019-08-01 00:00:02|51.819645|   51.819645|
    |   1|2019-08-01 00:00:03| 51.81964|    51.81964|
    |   1|2019-08-01 00:00:04|     null|    51.81964|
    |   1|2019-08-01 00:00:05|     null|    51.81964|
    |   1|2019-08-01 00:00:06|     null|    51.81964|
    |   1|2019-08-01 00:00:07| 51.82385|    51.82385|
    +----+-------------------+---------+------------+
    

    在多列上处理 (ffill+bfill),使用列表推导式:

    to process (ffill+bfill) on multiple columns, use a list comprehension:

    cols = ['latitude', 'longitude']
    df_new = df.select([ c for c in df.columns if c not in cols ] + [ coalesce(last(c,True).over(w1), first(c,True).over(w2)).alias(c) for c in cols ])
    

    这篇关于Pyspark 在列级别内向前和向后填充的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆