星火/斯卡拉:与去年观察着填写 [英] Spark / Scala: forward fill with last observation

查看:228
本文介绍了星火/斯卡拉:与去年观察着填写的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用星火1.4.0,斯卡拉2.10

Using Spark 1.4.0, Scala 2.10

我一直在试图找出一种方法来填补转发空值与一个已知的观察,但我没有看到一个简单的方法。我认为这是做一个pretty平常的事,但无法找到一个示例展示了如何做到这一点。

I've been trying to figure out a way to forward fill null values with the last known observation, but I don't see an easy way. I would think this is a pretty common thing to do, but can't find an example showing how to do this.

我看到的功能来转发填补NaN的一个值,或者滞后/超前的功能,以填补或偏移数据移出,但没有拿起最后一个已知值。

I see functions to forward fill NaN with a value, or lag / lead functions to fill or shift data by an offset, but nothing to pick up the last known value.

在网上看,我看到很多Q / A关于R中同样的事情,但不是在星火/斯卡拉。

Looking online, I see lots of Q/A about the same thing in R, but not in Spark / Scala.

我用的日期范围想着映射,筛选的NaN出来的成果,并挑选最后一个元素,但我想我感到困惑的语法。

I was thinking about mapping over a date range, filter the NaNs out of the results and pick the last element but I guess I'm confused about the syntax.

使用DataFrames我尝试像

using DataFrames I try something like

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("testForwardFill", (90 to 0).map(i=>lag(df.col("myValue"),i,0).over(spec)).filter(p=>p.getItem.isNotNull).last)

但是,这并不让我在任何地方。

but that doesn't get me anywhere.


该过滤器部分不工作; map函数返回spark.sql.Columns的序列,但滤镜函数返回一个布尔值,所以我需要一个值出列来测试,但只存在似乎是返回列列的方法。

The filter part doesn't work; the map function returns a Sequence of spark.sql.Columns, but the filter function expects to return a Boolean, so I need to get a value out of the Column to test on but there only seem to be Column methods that return a Column.

有没有办法做到这一点更简单的火花?

Is there any way to do this more 'simply' on Spark?

感谢您的输入

编辑:

简单的例子样本输入:

2015-06-01,33
2015-06-02,
2015-06-03,
2015-06-04,
2015-06-05,22
2015-06-06,
2015-06-07,
...

期望的输出:

2015-06-01,33
2015-06-02,33
2015-06-03,33
2015-06-04,33
2015-06-05,22
2015-06-06,22
2015-06-07,22

请注意:
1)我有很多列,其中许多具有此丢失的数据模式,但不是在同一日期/时间。如果我需要,我会做一次转换一列。

NOTE: 1) I have many columns, many of which have this missing data pattern, but not at the same date/time. If I need to I will do the transform one column at a time.

修改

继@ zero323的回答我试着这样说:

Following @zero323 's answer I tried this way:

    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD

    val rows: RDD[Row] = df.orderBy($"Date").rdd


    def notMissing(row: Row): Boolean = { !row.isNullAt(1) }

    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows.mapPartitionsWithIndex{
   case (i, iter) => Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
.collectAsMap

    val toCarryBd = sc.broadcast(toCarry)

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = { if (iter.contains(null)) iter.map(row => Row(toCarryBd.value(i).get(1))) else iter }

    val imputed: RDD[Row] = rows.mapPartitionsWithIndex{ case (i, iter) => fill(i, iter)}

广播可变最终作为值的无空列表。这是进步,但我仍然不能得到映射工作。
但我什么也得不到,因为指数 I 在不映射到的原始数据,它映射到子集而不空。

the broadcast variable ends up as a list of values without nulls. That's progress but I still can't get the mapping to work. but i get nothing, because the index i in the doesn't map to the original data, it maps to the subset without null.

我是什么在这里失踪?

修改和解决方案(从@ zero323的回答infered):

import org.apache.spark.sql.expressions.Window

val sqlContext = new HiveContext(sc)

var spec = Window.partitionBy("id").orderBy("Date")
val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("test.csv")

val df2 = df.withColumn("test", coalesce((0 to 90).map(i=>lag(df.col("test"),i,0).over(spec)): _*))

如果你使用RDDS代替DataFrames见下zero323的答案更多的选择。上述解决方案可能不是最有效的,但对我的作品。如果你正在寻找优化,检查出RDD解决方案。

See zero323's answer below for more options if you're using RDDs instead of DataFrames. The solution above may not be the most efficient but works for me. If you're looking to optimize, check out the RDD solution.

推荐答案

首先尽量避免窗口的功能,如果你不能提供 PARTITION BY 条款。它移动数据到单个分区所以大部分时间,它是根本行不通的。

Initial answer (a single time series assumption):

First of all try avoid window functions if you cannot provide PARTITION BY clause. It moves data to a single partition so most of the time it is simply not feasible.

你可以做的是用来填补 RDD 缺口 mapPartitionsWithIndex 。既然你没有提供的示例数据或预期的输出认为这是伪code不是一个真正的Scala程序:

What you can do is to fill gaps on RDD using mapPartitionsWithIndex. Since you didn't provide an example data or expected output consider this to be pseudocode not a real Scala program:


  • 首先让阶数据帧按日期并转换为 RDD

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD

val rows: RDD[Row] = df.orderBy($"Date").rdd


  • 接下来让我们找到每个分区的最后一个不为空观察

  • next lets find the last not null observation per partition

    def notMissing(row: Row): Boolean = ???
    
    val toCarry: scala.collection.Map[Int,Option[org.apache.spark.sql.Row]] = rows
      .mapPartitionsWithIndex{ case (i, iter) => 
        Iterator((i, iter.filter(notMissing(_)).toSeq.lastOption)) }
      .collectAsMap
    


  • 和转换该地图播放

    val toCarryBd = sc.broadcast(toCarry)
    


  • 终于在映射分区再次填补国内空白:

  • finally map over partitions once again filling the gaps:

    def fill(i: Int, iter: Iterator[Row]): Iterator[Row] = {
      // If it is the beginning of partition and value is missing
      // extract value to fill from toCarryBd.value
      // Remember to correct for empty / only missing partitions
      // otherwise take last not-null from the current partition
    }
    
    val imputed: RDD[Row] = rows
      .mapPartitionsWithIndex{ case (i, iter) => fill(i, iter) } 
    


  • 终于转换回数据帧

  • finally convert back to DataFrame

    魔鬼在细节。如果您的数据后,所有分区然后整个问题可以通过解决 GROUPBY 。让我们按类型的列V假设你只需分区 T 日期是一个整数的时间戳:

    The devil is in the detail. If your data is partitioned after all then a whole problem can be solved using groupBy. Lets assume you simply partition by column "v" of type T and Date is an integer timestamp:

    def fill(iter: List[Row]): List[Row] = {
      // Just go row by row and fill with last non-empty value
      ???
    }
    
    val groupedAndSorted = df.rdd
      .groupBy(_.getAs[T]("k"))
      .mapValues(_.toList.sortBy(_.getAs[Int]("Date")))
    
    val rows: RDD[Row] = groupedAndSorted.mapValues(fill).values.flatMap(identity)
    
    val dfFilled = sqlContext.createDataFrame(rows, df.schema)
    

    这样你可以在同一时间填写所有列。

    This way you can fill all columns at the same time.

    可以这样用DataFrames做,而不是来回转换到RDD?

    Can this be done with DataFrames instead of converting back and forth to RDD?

    这取决于,尽管它不太可能是有效的。如果最大的差距是比较小的,你可以做这样的事情:

    It depends, although it is unlikely to be efficient. If maximum gap is relatively small you can do something like this:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.{WindowSpec, Window}
    import org.apache.spark.sql.Column
    
    val maxGap: Int = ???  // Maximum gap between observations
    val columnsToFill: List[String] = ???  // List of columns to fill
    val suffix: String = "_" // To disambiguate between original and imputed 
    
    // Take lag 1 to maxGap and coalesce
    def makeCoalesce(w: WindowSpec)(magGap: Int)(suffix: String)(c: String) = {
      // Generate lag values between 1 and maxGap
      val lags = (1 to maxGap).map(lag(col(c), _)over(w))
      // Add current, coalesce and set alias
      coalesce(col(c) +: lags: _*).alias(s"$c$suffix")
    }
    
    
    // For each column you want to fill nulls apply makeCoalesce
    val lags: List[Column] = columnsToFill.map(makeCoalesce(w)(maxGap)("_"))
    
    
    // Finally select
    val dfImputed = df.select($"*" :: lags: _*)
    

    有能够容易地调整以使用每列不同的最大间隙。

    It can be easily adjusted to use different maximum gap per column.

    这篇关于星火/斯卡拉:与去年观察着填写的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

  • 查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆