填补时间序列 Spark 中的空白 [英] Filling gaps in timeseries Spark

查看:26
本文介绍了填补时间序列 Spark 中的空白的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在处理时间序列数据时遇到问题.由于电源故障,数据集中缺少一些时间戳.我需要通过添加行来填补这个空白,然后,我可以插入缺失的值.

I have a problem dealing with time-series data. Due power failures some timestamps are missing in the dataset. I need to fill this gaps by adding rows, and after that, I can interpolate the missing values.

输入数据:

periodstart                usage
---------------------------------
2015-09-11 02:15           23000   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283  
2015-09-11 03:45           23786   
2015-09-11 04:00           25039

想要的输出:

periodstart                usage
---------------------------------
2015-09-11 02:15           23000   
2015-09-11 02:30           0   
2015-09-11 02:45           0   
2015-09-11 03:00           0   
2015-09-11 03:15           23344   
2015-09-11 03:30           23283   
2015-09-11 03:45           23786   
2015-09-11 04:00           25039  

现在我已经在数据集 foreach 函数中使用 while 循环修复了这个问题.问题是我必须先将数据集收集到驱动程序,然后才能进行 while 循环.所以这不是 Spark 的正确方式.

Now I have fixed this with a while loop within a dataset foreach function. The problem is that I have to collect the dataset first to the driver before I can do a while loop. So that is not the right way for Spark.

谁能给我一个更好的解决方案?

Can someone give me a better solution?

这是我的代码:

MissingMeasurementsDS.collect().foreach(row => {
  // empty list for new generated measurements
  val output = ListBuffer.empty[Measurement]
  // Missing measurements
  val missingMeasurements = row.getAs[Int]("missingmeasurements")
  val lastTimestamp = row.getAs[Timestamp]("previousperiodstart")
  //Generate missing timestamps
  var i = 1
  while (i <= missingMeasurements) {
    //Increment timestamp with 15 minutes (900000 milliseconds)
    val newTimestamp = lastTimestamp.getTime + (900000 * i)
    output += Measurement(new Timestamp(newTimestamp), 0))
    i += 1
  }
  //Join interpolated measurements with correct measurements
  completeMeasurementsDS.join(output.toDS())
})
completeMeasurementsDS.show()
println("OutputDF count = " + completeMeasurementsDS.count())

推荐答案

如果输入DataFrame有如下结构:

root
 |-- periodstart: timestamp (nullable = true)
 |-- usage: long (nullable = true)

Scala

确定最小值/最大值:

val (minp, maxp) = df
  .select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint")))
  .as[(Long, Long)]
  .first

设置步长,例如 15 分钟:

Set step, for example for 15 minutes:

val step: Long = 15 * 60

生成参考范围:

val reference = spark
  .range((minp / step) * step, ((maxp / step) + 1) * step, step)
  .select($"id".cast("timestamp").alias("periodstart"))

加入并填补空白:

reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))

Python

在 PySpark 中类似:

Similarly in PySpark:

from pyspark.sql.functions import col, min as min_, max as max_

step = 15 * 60

minp, maxp = df.select(
    min_("periodstart").cast("long"), max_("periodstart").cast("long")
).first()

reference = spark.range(
    (minp / step) * step, ((maxp / step) + 1) * step, step
).select(col("id").cast("timestamp").alias("periodstart"))

reference.join(df, ["periodstart"], "leftouter")

这篇关于填补时间序列 Spark 中的空白的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆