Apache Spark-处理基于临时RDD的Windows [英] Apache Spark - Dealing with Sliding Windows on Temporal RDDs

查看:94
本文介绍了Apache Spark-处理基于临时RDD的Windows的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在过去的几个月中,我一直在与Apache Spark进行大量合作,但是现在我收到了一个非常艰巨的任务,即在成对的 RDD的滑动窗口上计算平均值/最小值/最大值等其中键组件是日期标记,而值组件是矩阵。因此,每个聚合函数还应该返回一个矩阵,其中每个单元在该时间段内所有该单元的平均值都被平均。

I've been working quite a lot with Apache Spark the last few months but now I have received a pretty difficult task, to compute average/minimum/maximum etcetera on a sliding window over a paired RDD where the Key component is a date tag and the value component is a matrix. So each aggregation function should also return a matrix, where for each cell the average for all of that cell in the time period is averaged.

我想说我想要每7天的平均值,以及1天的滑动窗口。滑动窗口移动单位始终为1,然后是窗口大小的单位(因此,如果每隔12周,则窗口移动单位为1)。

I want to be able to say that I want the average for every 7 days, with a sliding window of one day. The sliding window movement unit is always one, and then the unit of the size of the window (so if it's every 12 weeks, the window movement unit is 1).

我最初的想法现在是简单地进行迭代,如果我们想要平均每X天X次,并且每次只需按日期对元素进行分组,并使用偏移量即可。

My initial thought now is to simply iterate, if we want an average per X days, X times, and for each time just group the elements by it's date, with an offset.

因此,如果我们遇到这种情况:

So if we have this scenario:

天数:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

Days: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

矩阵:ABCDEFGHIJKLMNO

Matrices: A B C D E F G H I J K L M N O

我们想要每5天的平均值,我将迭代5次并在此处显示分组:

And we want the average per 5 days, I will iterate 5 times and show the grouping here:

第一次迭代:

组1:(1,A)(2,B)(3,C)(4,D) (5,E)

Group 1: (1, A) (2, B) (3, C) (4, D) (5, E)

第2组:(6,F)(7,G)(8,H)(9,I)(10,J)

Group 2: (6, F) (7, G) (8, H) (9, I) (10, J)

第3组:(11,K)(12,L)(13,M)(14,N)(15,O)

Group 3: (11, K) (12, L) (13, M) (14, N) (15, O)

第二次迭代:

第1组:(2,B)(3,C)(4,D)(5,E)(6, F)

Group 1: (2, B) (3, C) (4, D) (5, E) (6, F)

第2组:(7,G)(8,H)(9,I)(10,J),(11,K)

Group 2: (7, G) (8, H) (9, I) (10, J), (11, K)

第3组:(12,L)(13,M)(14,N)(15,O)

Group 3: (12, L) (13, M) (14, N) (15, O)

Etcetera,对于每个组,我必须执行折叠/减少步骤才能得出平均值。

Etcetera, and for each group, I have to do a fold/reduce procedure to get the average.

但是,正如您可能想像的那样,这非常缓慢并且可能是一个相当糟糕的方法。

However as you might imagine, this is pretty slow and probably a rather bad way to do it. I can't really figure out any better way to do it though.

推荐答案

如果您转换为DataFrame,这一切都会得到解决。简单得多-您可以将数据自行重新结合起来并找到平均值。假设我有一系列这样的数据:

If you convert to a DataFrame, this all gets a lot simpler -- you can just self-join the data back on itself and find the average. Say I have a series of data like this:

tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78

上卷为:

tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4

然后我需要创建一个UDF更改加入条件的日期(请注意,我仅通过使用 offset = -2 使用2天窗口):

I then would need to create a UDF to shift the date for the join condition (note I am only using a 2 day window by using offset = -2):

def dateShift(myDate: java.sql.Date): java.sql.Date = {
  val offset = -2;
  val cal = Calendar.getInstance;
  cal.setTime(myDate);
  cal.add(Calendar.DATE, offset);
  new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)

然后我可以轻松地找到这样的2天滚动平均值:

And then I could easily find a 2-day rolling average like this:

val windowDF = tsDF.select($"date")
  .groupBy($"date")
  .agg($"date")
  .join(
    tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
    $"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
  )
  .groupBy($"date")
  .agg($"date",avg($"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325

虽然这并非您要尝试做的,但是您会看到如何使用DataFrame自联接从数据集中提取运行平均值。希望对您有所帮助。

While this isn't exactly what you were trying to do, you see how you can use a DataFrame self-join to extract running averages from a data set. Hope you found this helpful.

这篇关于Apache Spark-处理基于临时RDD的Windows的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆