Pyspark 在列级别内向前和向后填充 [英] Pyspark forward and backward fill within column level
问题描述
我尝试在 pyspark 数据框中填充缺失的数据.pyspark 数据框如下所示:
I try to fill missing data in a pyspark dataframe. The pyspark dataframe looks as such:
+---------+---------+-------------------+----+
| latitude|longitude| timestamplast|name|
+---------+---------+-------------------+----+
| | 4.905615|2019-08-01 00:00:00| 1|
|51.819645| |2019-08-01 00:00:00| 1|
| 51.81964| 4.961713|2019-08-01 00:00:00| 2|
| | |2019-08-01 00:00:00| 3|
| 51.82918| 4.911187| | 3|
| 51.82385| 4.901488|2019-08-01 00:00:03| 5|
+---------+---------+-------------------+----+
在名称"列中,我想向前填充或向后填充(以必要为准)以仅填充纬度"和经度"(不应填充时间戳").我该怎么做?
Within the column "name" I want to either forward fill or backward fill (whichever is necessary) to fill only "latitude" and "longitude" ("timestamplast" should not be filled). How do I do this?
输出将是:
+---------+---------+-------------------+----+
| latitude|longitude| timestamplast|name|
+---------+---------+-------------------+----+
|51.819645| 4.905615|2019-08-01 00:00:00| 1|
|51.819645| 4.905615|2019-08-01 00:00:00| 1|
| 51.81964| 4.961713|2019-08-01 00:00:00| 2|
| 51.82918| 4.911187|2019-08-01 00:00:00| 3|
| 51.82918| 4.911187| | 3|
| 51.82385| 4.901488|2019-08-01 00:00:03| 5|
+---------+---------+-------------------+----+
在 Pandas 中,这将是这样完成的:
In Pandas this would be done as such:
df = df.groupby("name")['longitude','latitude'].apply(lambda x : x.ffill().bfill())
这在 Pyspark 中如何实现?
How would this be done in Pyspark?
推荐答案
我建议你使用以下两个 Window Specs:
I suggest you use the following two Window Specs:
from pyspark.sql import Window
w1 = Window.partitionBy('name').orderBy('timestamplast')
w2 = w1.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
地点:
w1 是我们用来计算前向填充的常规 WinSpec,与以下内容相同:
w1 is the regular WinSpec we use to calculate the forward-fill which is the same as the following:
w1 = Window.partitionBy('name').orderBy('timestamplast').rowsBetween(Window.unboundedPreceding,0)
请参阅 文档默认窗口框架:
see the following note from the documentation for default window frames:
注意:当未定义排序时,默认使用无界窗口框架(rowFrame、unboundedPreceding、unboundedFollowing).定义排序时,默认使用不断增长的窗口框架(rangeFrame、unboundedPreceding、currentRow).
Note: When ordering is not defined, an unbounded window frame (rowFrame, unboundedPreceding, unboundedFollowing) is used by default. When ordering is defined, a growing window frame (rangeFrame, unboundedPreceding, currentRow) is used by default.
在ffill
之后,如果存在的话,我们只需要修复最前面的空值,这样我们就可以使用一个固定的Window框架(在Window.unboundedPreceding和Window.unboundedFollowing之间),这比使用正在运行的 Window 框架更有效,因为它只需要一个聚合,请参阅 SPARK-8638
after ffill
, we only need to fix the null values at the very front if exists, so we can use a fixed Window frame(Between Window.unboundedPreceding and Window.unboundedFollowing), this is more efficient than using a running Window frame since it requires only one aggregate, see SPARK-8638
然后可以使用 coalesce + last + first 基于以上两个WindowSpecs:
Then the x.ffill().bfill() can be handled by using coalesce + last + first based on the above two WindowSpecs:
from pyspark.sql.functions import coalesce, last, first
df.withColumn('latitude_new', coalesce(last('latitude',True).over(w1), first('latitude',True).over(w2))) \
.select('name','timestamplast', 'latitude','latitude_new') \
.show()
+----+-------------------+---------+------------+
|name| timestamplast| latitude|latitude_new|
+----+-------------------+---------+------------+
| 1|2019-08-01 00:00:00| null| 51.819645|
| 1|2019-08-01 00:00:01| null| 51.819645|
| 1|2019-08-01 00:00:02|51.819645| 51.819645|
| 1|2019-08-01 00:00:03| 51.81964| 51.81964|
| 1|2019-08-01 00:00:04| null| 51.81964|
| 1|2019-08-01 00:00:05| null| 51.81964|
| 1|2019-08-01 00:00:06| null| 51.81964|
| 1|2019-08-01 00:00:07| 51.82385| 51.82385|
+----+-------------------+---------+------------+
在多列上处理 (ffill+bfill),使用列表推导式:
to process (ffill+bfill) on multiple columns, use a list comprehension:
cols = ['latitude', 'longitude']
df_new = df.select([ c for c in df.columns if c not in cols ] + [ coalesce(last(c,True).over(w1), first(c,True).over(w2)).alias(c) for c in cols ])
这篇关于Pyspark 在列级别内向前和向后填充的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!