Spark/Scala:前向填充最后一次观察 [英] Spark / Scala: forward fill with last observation

查看:26
本文介绍了Spark/Scala:前向填充最后一次观察的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用 Spark 1.4.0、Scala 2.10

Using Spark 1.4.0, Scala 2.10

我一直在想办法用最后一次已知的观察结果来向前填充空值,但我没有看到简单的方法.我认为这是很常见的事情,但找不到说明如何执行此操作的示例.

I've been trying to figure out a way to forward fill null values with the last known observation, but I don't see an easy way. I would think this is a pretty common thing to do, but can't find an example showing how to do this.

我看到了用一个值向前填充 NaN 的函数,或者用一个偏移量填充或移动数据的滞后/超前函数,但没有找到最后一个已知值.

I see functions to forward fill NaN with a value, or lag / lead functions to fill or shift data by an offset, but nothing to pick up the last known value.

在网上看,我在 R 中看到很多关于同一件事的问答,但在 Spark/Scala 中没有.

Looking online, I see lots of Q/A about the same thing in R, but not in Spark / Scala.

我正在考虑在日期范围内映射,从结果中过滤出 NaN 并选择最后一个元素,但我想我对语法感到困惑.

I was thinking about mapping over a date range, filter the NaNs out of the results and pick the last element but I guess I'm confused about the syntax.

使用 DataFrames 我尝试类似的事情

Using DataFrames I try something like

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

但这并没有让我去任何地方.

but that doesn't get me anywhere.

过滤器部分不起作用;map 函数返回一个spark.sql.Columns 序列,但过滤器函数期望返回一个布尔值,所以我需要从Column 中获取一个值进行测试,但似乎只有Column 方法返回一个Column.

The filter part doesn't work; the map function returns a Sequence of spark.sql.Columns, but the filter function expects to return a Boolean, so I need to get a value out of the Column to test on but there only seem to be Column methods that return a Column.

有什么办法可以在 Spark 上更简单"地做到这一点?

Is there any way to do this more 'simply' on Spark?

感谢您的意见

编辑:

简单示例输入:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

预期输出:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

注意:

  1. 我有很多列,其中很多都有这种缺失的数据模式,但日期/时间不同.如果需要,我将一次转换一列.

编辑:

按照@zero323 的回答,我尝试了这种方式:

Following @zero323 's answer I tried this way:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

广播变量最终是一个没有空值的值列表.这是进步,但我仍然无法使映射工作.但我什么也没得到,因为索引 i 没有映射到原始数据,它映射到没有空值的子集.

the broadcast variable ends up as a list of values without nulls. That's progress but I still can't get the mapping to work. but i get nothing, because the index i in the doesn't map to the original data, it maps to the subset without null.

我在这里遗漏了什么?

编辑和解决方案(根据@zero323 的回答推断):

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

如果您使用 RDD 而不是 DataFrame,请参阅以下 zero323 的回答以获取更多选项.上面的解决方案可能不是最有效的,但对我有用.如果您想优化,请查看 RDD 解决方案.

See zero323's answer below for more options if you're using RDDs instead of DataFrames. The solution above may not be the most efficient but works for me. If you're looking to optimize, check out the RDD solution.

推荐答案

初始答案(单一时间序列假设):

如果您不能提供 PARTITION BY 子句,请首先尝试避免使用窗口函数.它将数据移动到单个分区,因此大多数情况下根本不可行.

Initial answer (a single time series assumption):

First of all try avoid window functions if you cannot provide PARTITION BY clause. It moves data to a single partition so most of the time it is simply not feasible.

您可以做的是使用 mapPartitionsWithIndex 来填补 RDD 上的空白.由于您没有提供示例数据或预期输出,请认为这是伪代码而不是真正的 Scala 程序:

What you can do is to fill gaps on RDD using mapPartitionsWithIndex. Since you didn't provide an example data or expected output consider this to be pseudocode not a real Scala program:

  • 首先让DataFrame按日期排序并转换为RDD

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

val rows: RDD[Row] = df.orderBy($"Date").rdd

  • 接下来让我们找到每个分区的最后一个非空观察

  • next lets find the last not null observation per partition

    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    

  • 并将这个Map转换成广播

    val toCarryBd = sc.broadcast(toCarry)
    

  • 最后再次映射分区以填补空白:

  • finally map over partitions once again filling the gaps:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    

  • 最终转换回DataFrame

  • finally convert back to DataFrame

    细节决定成败.如果您的数据毕竟是分区的,那么可以使用 groupBy 解决整个问题.假设您只是按 T 类型的v"列进行分区,Date 是一个整数时间戳:

    The devil is in the detail. If your data is partitioned after all then a whole problem can be solved using groupBy. Lets assume you simply partition by column "v" of type T and Date is an integer timestamp:

    def fill(iter: List[Row]): List[Row] = {
      // Just go row by row and fill with last non-empty value
      ???
    }
    
    val groupedAndSorted = df.rdd
      .groupBy(_.getAs[T]("k"))
      .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
    
    val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
    
    val dfFilled = sqlContext.createDataFrame(rows, df.schema)
    

    这样您就可以同时填充所有列.

    This way you can fill all columns at the same time.

    这可以用 DataFrames 来完成,而不是来回转换为 RDD 吗?

    Can this be done with DataFrames instead of converting back and forth to RDD?

    这取决于,虽然它不太可能有效.如果最大差距相对较小,您可以执行以下操作:

    It depends, although it is unlikely to be efficient. If maximum gap is relatively small you can do something like this:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.{WindowSpec, Window}
    import org.apache.spark.sql.Column
    
    val maxGap: Int = ???  // Maximum gap between observations
    val columnsToFill: List[String] = ???  // List of columns to fill
    val suffix: String = "_" // To disambiguate between original and imputed 
    
    // Take lag 1 to maxGap and coalesce
    def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
      // Generate lag values between 1 and maxGap
      val lags = (1 to maxGap).map(lag(col(c), _)over(w))
      // Add current, coalesce and set alias
      coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
    }
    
    
    // For each column you want to fill nulls apply makeCoalesce
    val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))
    
    
    // Finally select
    val dfImputed = df.select($"*" :: lags: _*)
    

    可以轻松调整以使用不同的每列最大间隙.

    It can be easily adjusted to use different maximum gap per column.

    在最新的 Spark 版本中实现类似结果的一种更简单的方法是使用 lastignoreNulls:

    A simpler way to achieve a similar result in the latest Spark version is to use last with ignoreNulls:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    
    val w = Window.partitionBy($"k").orderBy($"Date")
      .rowsBetween(Window.unboundedPreceding, -1)
    
    df.withColumn("value", coalesce($"value", last($"value", true).over(w)))
    

    虽然可以删除 partitionBy 子句并在全局范围内应用此方法,但对于大型数据集,它的成本会高得令人望而却步.

    While it is possible to drop partitionBy clause and apply this method globally, it would prohibitively expensive with large datasets.

    这篇关于Spark/Scala:前向填充最后一次观察的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆