Spark/Scala:用最后一次好的观察填充 nan [英] Spark / Scala: fill nan with last good observation

查看:26
本文介绍了Spark/Scala:用最后一次好的观察填充 nan的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 spark 2.0.1 并想用列中的最后一个已知值填充 nan 值.

我能找到的关于 spark 的唯一参考 Spark/Scala: forward fill with上次观察用之前填充 nullpyspark 的已知良好价值,它似乎使用 RDD.

我宁愿留在数据框/数据集世界中,并可能处理多个 nan 值.这可能吗?

我的假设是数据(最初从例如 CSV 文件加载是按时间排序的,并且此顺序保留在分布式设置中,例如通过关闭/最后一个已知值填充是正确的.也许填充以前的值就足够了对于大多数记录,连续没有 2 个或更多 nan 记录.这实际上成立吗?重点是一个

myDf.sort("foo").show

会破坏任何订单,例如所有 null 值都将首先出现.

一个小例子:

import java.sql.{ Date, Timestamp }案例类 FooBar(foo:Date, bar:String)val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate")).toDF("foo","bar").withColumn("foo", 'foo.cast("Date")).as[FooBar]

结果

+-----------+------------+|富|酒吧|+------------+--------------------+|2016-01-01|第一||2016-01-02|第二||空|noValidFormat||2016-01-04|lastAssumingSameDate|+------------+--------------------+

我想用最后一个已知的值来修正这个值.我怎样才能做到这一点?

+-----------+------------+|富|酒吧|+------------+--------------------+|2016-01-01|第一||2016-01-02|第二||2016-01-02|noValidFormat||2016-01-04|lastAssumingSameDate|+------------+--------------------+

编辑

就我而言,填充上一行的值就足够了,因为只有非常有限的错误值.

编辑2

我尝试添加索引列

val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate")).toDF("foo", "bar").withColumn("foo", 'foo.cast("Date")).as[FooBar].withColumn("rowId", monotonically_increasing_id())

然后用最后一个值填充.

myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show

但是上面写着以下警告:没有为窗口操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降.如何引入有意义的分区?

+------------+------------+-----+----------+|富|条|行ID|fooLag|+------------+------------+-----+------------+|2016-01-01|第一|0|空||2016-01-02|第二|1|2016-01-01||空|noValidFormat|2|2016-01-02||2016-01-04|lastAssumingSameDate|3|空|+------------+------------+-----+------------+

解决方案

这是一个中间答案.然而,这并不是很好,因为没有分区/只使用一个分区.我还在寻找更好的方法来解决问题

df.withColumn("rowId", monotonically_increasing_id()).withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId)).withColumn("columnWithNullReplaced",when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull"))

编辑

我正在使用 mapPartitionsWithIndex 构建更好的解决方案https://gist.github.com/geoHeil/6a23d18ccec085d486165089 尚未完成30f2>

编辑2

添加

if (i == 0) {lastNotNullRow = toCarryBd.value.get(i + 1).get} 别的 {lastNotNullRow = toCarryBd.value.get(i - 1).get}

将导致预期的结果.

I am using the spark 2.0.1 and want to fill nan values with the last good known value in the column.

The only reference for spark I could find Spark / Scala: forward fill with last observation or Fill in null with previously known good value with pyspark which seem to use RDD.

I would rather like to stay in the data frame / dataset world and possible handle multiple nan values. Is this possible?

My assumption is that the data (initially loaded from e.g. a CSV file is ordered by time and this order is preserved in the distributed setting e.g. filling by close / last good known value is correct. Maybe filling with the previous value is enough as for most records there are no 2 or more nan records in a row. Does this actually hold? The point is that a

myDf.sort("foo").show

Would destroy any order e.g. all null values will come first.

A small example:

import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
         .toDF("foo","bar")
         .withColumn("foo", 'foo.cast("Date"))
         .as[FooBar]

Results in

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|      null|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

I would like to fix the value with the last good known value. How can I achieve this?

+----------+--------------------+
|       foo|                 bar|
+----------+--------------------+
|2016-01-01|               first|
|2016-01-02|              second|
|2016-01-02|       noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+

edit

in my case, it would be good enough to fill the value from the row above, as there are only very limited faulty values.

edit2

I try to add an index column

val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
    .toDF("foo", "bar")
    .withColumn("foo", 'foo.cast("Date"))
    .as[FooBar]
    .withColumn("rowId", monotonically_increasing_id())

And then fill with the last value.

myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show

But that reads the following warning: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation. How could I introduce meaningful partitions?

+----------+--------------------+-----+----------+
|       foo|                 bar|rowId|    fooLag|
+----------+--------------------+-----+----------+
|2016-01-01|               first|    0|      null|
|2016-01-02|              second|    1|2016-01-01|
|      null|       noValidFormat|    2|2016-01-02|
|2016-01-04|lastAssumingSameDate|    3|      null|
+----------+--------------------+-----+----------+

解决方案

This is an intermediate answer. However, it is not great as no partitions / only a single partition is used. I am still looking for a better way to solve the problem

df
    .withColumn("rowId", monotonically_increasing_id())
    .withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
    .withColumn("columnWithNullReplaced",
      when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")

    )

edit

I am working on building a better solution using mapPartitionsWithIndex https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 is not complete yet.

edit2

adding

if (i == 0) {
          lastNotNullRow = toCarryBd.value.get(i + 1).get
        } else {
          lastNotNullRow = toCarryBd.value.get(i - 1).get
        }

will lead to the desired result.

这篇关于Spark/Scala:用最后一次好的观察填充 nan的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆