Pyspark:使用DataFrame的最后观察结果进行正向填充 [英] Pyspark : forward fill with last observation for a DataFrame
问题描述
使用Spark 1.5.1,
Using Spark 1.5.1,
我一直在尝试用DataFrame的一列的最后一个已知观察结果来填充空值.
I've been trying to forward fill null values with the last known observation for one column of my DataFrame.
可以从一个空值开始,在这种情况下,我将使用第一个已知观测值向后填充该空值.但是,如果这也使代码复杂化,则可以跳过这一点.
It is possible to start with a null value and for this case I would to backward fill this null value with the first knwn observation. However, If that too complicates the code, this point can be skipped.
在此帖子中,在Scala中提供了一个解决方案 zero323 .
In this post, a solution in Scala was provided for a very similar problem by zero323.
但是,我不了解Scala,也无法在Pyspark API代码中成功翻译"它.可以用Pyspark做到吗?
But, I don't know Scala and I don't succeed to ''translate'' it in Pyspark API code. It's possible to do it with Pyspark ?
感谢您的帮助.
下面是一个简单的示例输入示例:
Below, a simple example sample input:
| cookie_ID | Time | User_ID
| ------------- | -------- |-------------
| 1 | 2015-12-01 | null
| 1 | 2015-12-02 | U1
| 1 | 2015-12-03 | U1
| 1 | 2015-12-04 | null
| 1 | 2015-12-05 | null
| 1 | 2015-12-06 | U2
| 1 | 2015-12-07 | null
| 1 | 2015-12-08 | U1
| 1 | 2015-12-09 | null
| 2 | 2015-12-03 | null
| 2 | 2015-12-04 | U3
| 2 | 2015-12-05 | null
| 2 | 2015-12-06 | U4
预期输出:
| cookie_ID | Time | User_ID
| ------------- | -------- |-------------
| 1 | 2015-12-01 | U1
| 1 | 2015-12-02 | U1
| 1 | 2015-12-03 | U1
| 1 | 2015-12-04 | U1
| 1 | 2015-12-05 | U1
| 1 | 2015-12-06 | U2
| 1 | 2015-12-07 | U2
| 1 | 2015-12-08 | U1
| 1 | 2015-12-09 | U1
| 2 | 2015-12-03 | U3
| 2 | 2015-12-04 | U3
| 2 | 2015-12-05 | U3
| 2 | 2015-12-06 | U4
推荐答案
The partitioned example code from Spark / Scala: forward fill with last observation in pyspark is shown. This only works for data that can be partitioned.
加载数据
values = [
(1, "2015-12-01", None),
(1, "2015-12-02", "U1"),
(1, "2015-12-02", "U1"),
(1, "2015-12-03", "U2"),
(1, "2015-12-04", None),
(1, "2015-12-05", None),
(2, "2015-12-04", None),
(2, "2015-12-03", None),
(2, "2015-12-02", "U3"),
(2, "2015-12-05", None),
]
rdd = sc.parallelize(values)
df = rdd.toDF(["cookie_id", "c_date", "user_id"])
df = df.withColumn("c_date", df.c_date.cast("date"))
df.show()
DataFrame是
The DataFrame is
+---------+----------+-------+
|cookie_id| c_date|user_id|
+---------+----------+-------+
| 1|2015-12-01| null|
| 1|2015-12-02| U1|
| 1|2015-12-02| U1|
| 1|2015-12-03| U2|
| 1|2015-12-04| null|
| 1|2015-12-05| null|
| 2|2015-12-04| null|
| 2|2015-12-03| null|
| 2|2015-12-02| U3|
| 2|2015-12-05| null|
+---------+----------+-------+
用于对分区进行排序的列
Column used to sort the partitions
# get the sort key
def getKey(item):
return item.c_date
填充功能.如有必要,可用于填写多列.
The fill function. Can be used to fill in multiple columns if necessary.
# fill function
def fill(x):
out = []
last_val = None
for v in x:
if v["user_id"] is None:
data = [v["cookie_id"], v["c_date"], last_val]
else:
data = [v["cookie_id"], v["c_date"], v["user_id"]]
last_val = v["user_id"]
out.append(data)
return out
转换为rdd,对其进行分区,排序并填充缺失的值
Convert to rdd, partition, sort and fill the missing values
# Partition the data
rdd = df.rdd.groupBy(lambda x: x.cookie_id).mapValues(list)
# Sort the data by date
rdd = rdd.mapValues(lambda x: sorted(x, key=getKey))
# fill missing value and flatten
rdd = rdd.mapValues(fill).flatMapValues(lambda x: x)
# discard the key
rdd = rdd.map(lambda v: v[1])
转换回DataFrame
Convert back to DataFrame
df_out = sqlContext.createDataFrame(rdd)
df_out.show()
输出为
+---+----------+----+
| _1| _2| _3|
+---+----------+----+
| 1|2015-12-01|null|
| 1|2015-12-02| U1|
| 1|2015-12-02| U1|
| 1|2015-12-03| U2|
| 1|2015-12-04| U2|
| 1|2015-12-05| U2|
| 2|2015-12-02| U3|
| 2|2015-12-03| U3|
| 2|2015-12-04| U3|
| 2|2015-12-05| U3|
+---+----------+----+
这篇关于Pyspark:使用DataFrame的最后观察结果进行正向填充的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!