Spark/Scala:用最后的良好观察填充nan [英] Spark / Scala: fill nan with last good observation
问题描述
我正在使用spark 2.0.1,并希望用列中的最后一个已知值来填充nan值.
关于火花的唯一参考资料,我可以找到 Spark/Scala:向前填充最后一次观察或用以前的空值填充pyspark 具有很好的价值,它似乎使用了RDD.
我想留在数据框/数据集世界中,并可能处理多个nan值. 这可能吗?
我的假设是,数据(最初是从CSV文件中加载的)是按时间排序的,并且此顺序保留在分布式设置中,例如,用close/last已知值填充是正确的.也许用先前的值填充就足够了就大多数记录而言,连续没有2个或更多的Nan记录,这实际上成立吗? 重点是
myDf.sort("foo").show
会破坏任何命令,例如所有null
值都将排在首位.
一个小例子:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
结果
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
我想用最近的已知值来固定该值.我该如何实现?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
编辑
对于我来说,填充上面一行中的值就足够了,因为只有非常有限的错误值.
edit2
我尝试添加索引列
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())
然后填写最后一个值.
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show
但这会显示以下警告: 没有为窗口操作定义分区!将所有数据移动到单个分区中,这会导致严重的性能下降.如何引入有意义的分区?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+
这是一个中间答案.但是,由于没有分区/仅使用一个分区,所以它不是很好.我仍在寻找解决问题的更好方法
df
.withColumn("rowId", monotonically_increasing_id())
.withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
.withColumn("columnWithNullReplaced",
when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")
)
编辑
我正在使用mapPartitionsWithIndex
建立一个更好的解决方案
https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 尚未完成.. >
edit2
添加
if (i == 0) {
lastNotNullRow = toCarryBd.value.get(i + 1).get
} else {
lastNotNullRow = toCarryBd.value.get(i - 1).get
}
将导致预期的结果.
I am using the spark 2.0.1 and want to fill nan values with the last good known value in the column.
The only reference for spark I could find Spark / Scala: forward fill with last observation or Fill in null with previously known good value with pyspark which seem to use RDD.
I would rather like to stay in the data frame / dataset world and possible handle multiple nan values. Is this possible?
My assumption is that the data (initially loaded from e.g. a CSV file is ordered by time and this order is preserved in the distributed setting e.g. filling by close / last good known value is correct. Maybe filling with the previous value is enough as for most records there are no 2 or more nan records in a row. Does this actually hold? The point is that a
myDf.sort("foo").show
Would destroy any order e.g. all null
values will come first.
A small example:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
Results in
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
I would like to fix the value with the last good known value. How can I achieve this?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
edit
in my case, it would be good enough to fill the value from the row above, as there are only very limited faulty values.
edit2
I try to add an index column
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())
And then fill with the last value.
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show
But that reads the following warning: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. How could I introduce meaningful partitions?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+
This is an intermediate answer. However, it is not great as no partitions / only a single partition is used. I am still looking for a better way to solve the problem
df
.withColumn("rowId", monotonically_increasing_id())
.withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
.withColumn("columnWithNullReplaced",
when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")
)
edit
I am working on building a better solution using mapPartitionsWithIndex
https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 is not complete yet.
edit2
adding
if (i == 0) {
lastNotNullRow = toCarryBd.value.get(i + 1).get
} else {
lastNotNullRow = toCarryBd.value.get(i - 1).get
}
will lead to the desired result.
这篇关于Spark/Scala:用最后的良好观察填充nan的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!