Spark SQL数据框:跨行对计算的最佳方法 [英] Spark SQL dataframe: best way to compute across rowpairs

查看:201
本文介绍了Spark SQL数据框:跨行对计算的最佳方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Spark DataFrame"deviceDF",如下所示:

I have a Spark DataFrame "deviceDF" like so :

ID    date_time            state  
a     2015-12-11 4:30:00     up  
a     2015-12-11 5:00:00     down  
a     2015-12-11 5:15:00     up  
b     2015-12-12 4:00:00     down  
b     2015-12-12 4:20:00     up  
a     2015-12-12 10:15:00    down  
a     2015-12-12 10:20:00    up  
b     2015-12-14 15:30:00    down  

我正在尝试计算每个ID的停机时间.我从基于id进行分组开始,并分别计算所有正常运行时间和停机时间的总和来简化工作.然后取总的正常运行时间与停机时间之差.

I am trying to calculate the downtime for each of the IDs. I started simple by grouping based on id and separately computing the sum of all uptimes and downtimes. Then take the difference of the summed uptime and downtime.

val downtimeDF = deviceDF.filter($"state" === "down")
  .groupBy("ID")
  .agg(sum(unix_timestamp($"date_time")) as "down_time")  

val uptimeDF = deviceDF.filter($"state" === "up")
  .groupBy("ID")
  .agg(sum(unix_timestamp($"date_time")) as "up_time")  

val updownjoinDF = uptimeDF.join(downtimeDF, "ID")  

val difftimeDF = updownjoinDF
  .withColumn("diff_time", $"up_time" - $"down_time")  

但是,很少有情况会导致错误,例如设备关闭但从未恢复运行,在这种情况下,down_time是current_time与last_time处于关闭状态之间的时间差.

However there are few conditions that cause errors, such as the device went down but never came back up, in this case, the down_time is the difference between current_time and last_time it was down.

如果特定设备的第一个条目以"up"开头,则down_time是first_entry与此分析开始时间的差值,例如2015-12-11 00:00:00.使用数据框处理这些边界条件的最佳方法是什么?我需要编写自定义UDAF吗?

Also if the first entry for a particular device starts with 'up' then the down_time is difference of the first_entry and the time at the begining of this analysis, say 2015-12-11 00:00:00. Whats the best way to handle these border conditions using dataframe? Do I need to write a custom UDAF ?

推荐答案

您可以尝试的第一件事是使用窗口函数.尽管这通常不是最快的解决方案,但它简洁明了且极富表现力.以您的数据为例:

The first thing you can try is to use window functions. While this is usually not the fastest possible solution it is concise and extremely expressive. Taking your data as an example:

import org.apache.spark.sql.functions.unix_timestamp

val df = sc.parallelize(Array(
    ("a", "2015-12-11 04:30:00", "up"), ("a", "2015-12-11 05:00:00", "down"), 
    ("a", "2015-12-11 05:15:00", "up"), ("b", "2015-12-12 04:00:00", "down"), 
    ("b", "2015-12-12 04:20:00", "up"), ("a", "2015-12-12 10:15:00", "down"),
    ("a", "2015-12-12 10:20:00", "up"), ("b", "2015-12-14 15:30:00", "down")))
  .toDF("ID", "date_time", "state")
  .withColumn("timestamp", unix_timestamp($"date_time"))

让我们定义示例窗口:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{coalesce, lag, when, sum}

val w = Window.partitionBy($"ID").orderBy($"timestamp")

一些帮助者专栏

val previousTimestamp = coalesce(lag($"timestamp", 1).over(w), $"timestamp")
val previousState = coalesce(lag($"state", 1).over(w), $"state")

val downtime = when(
  previousState === "down",
  $"timestamp" - previousTimestamp
).otherwise(0).alias("downtime")

val uptime = when(
  previousState === "up",
  $"timestamp" - previousTimestamp
).otherwise(0).alias("uptime")

最后是一个基本查询:

val upsAndDowns = df.select($"*", uptime, downtime)
upsAndDowns.show

// +---+-------------------+-----+----------+------+--------+
// | ID|          date_time|state| timestamp|uptime|downtime|
// +---+-------------------+-----+----------+------+--------+
// |  a|2015-12-11 04:30:00|   up|1449804600|     0|       0|
// |  a|2015-12-11 05:00:00| down|1449806400|  1800|       0|
// |  a|2015-12-11 05:15:00|   up|1449807300|     0|     900|
// |  a|2015-12-12 10:15:00| down|1449911700|104400|       0|
// |  a|2015-12-12 10:20:00|   up|1449912000|     0|     300|
// |  b|2015-12-12 04:00:00| down|1449889200|     0|       0|
// |  b|2015-12-12 04:20:00|   up|1449890400|     0|    1200|
// |  b|2015-12-14 15:30:00| down|1450103400|213000|       0|
// +---+-------------------+-----+----------+------+--------+

以类似的方式,您可以向前看,如果一个组中没有更多记录,则可以使用当前时间戳来调整uptime/downtime总数.

In a similar manner you cna look forward and if there is no more records in a group you can adjust total uptime / downtime using current timestamp.

窗口函数提供了一些其他有用的功能,例如带有ROWS BETWEENRANGE BETWEEN子句的窗口定义.

Window functions provide some other useful features like window definitions with ROWS BETWEEN and RANGE BETWEEN clauses.

另一种可能的解决方案是将数据移至RDD并对RangePartitionermapPartitions和滑动窗口使用低级操作.对于基本内容,您甚至可以groupBy.这需要更多的努力,但也要灵活得多.

Another possible solution is to move your data to RDD and use low level operations with RangePartitioner, mapPartitions and sliding windows. For basic things you can even groupBy. This requires significantly more effort but is also much more flexible.

最后,Cloudera提供了一个 spark-timeseries 软件包.文档几乎不存在,但是测试已经足够全面,可以使您了解如何使用它.

Finally there is a spark-timeseries package from Cloudera. Documentation is close to non-existent but tests are comprehensive enough to give you some idea how to use it.

关于自定义UDAF,我不会感到乐观. UDAF API相当具体,并不完全灵活.

Regarding custom UDAFs I wouldn't be to optimistic. UDAF API is rather specific and not exactly flexible.

这篇关于Spark SQL数据框:跨行对计算的最佳方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆