Spark:如何按时间范围加入 RDD [英] Spark: How to join RDDs by time range

查看:30
本文介绍了Spark:如何按时间范围加入 RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个微妙的 Spark 问题,我就是无法解决这个问题.

I have a delicate Spark problem, where i just can't wrap my head around.

我们有两个 RDD(来自 Cassandra).RDD1 包含Actions,RDD2 包含Historic 数据.两者都有一个可以匹配/加入的 ID.但问题是这两个表有 N:N 关系.Actions 包含多个具有相同 id 的行,Historic 也是如此.以下是两个表中的一些示例日期.

We have two RDDs ( coming from Cassandra ). RDD1 contains Actions and RDD2 contains Historic data. Both have an id on which they can be matched/joined. But the problem is the two tables have an N:N relation ship. Actions contains multiple rows with the same id and so does Historic. Here are some example date from both tables.

Actions 时间实际上是一个时间戳

id  |  time  | valueX
1   |  12:05 | 500
1   |  12:30 | 500
2   |  12:30 | 125

Historic set_at 实际上是一个时间戳

id  |  set_at| valueY
1   |  11:00 | 400
1   |  12:15 | 450
2   |  12:20 | 50
2   |  12:25 | 75

我们如何以某种方式连接这两个表,从而得到这样的结果

How can we join these two tables in a way, that we get a result like this

1   |  100  # 500 - 400 for Actions#1 with time 12:05 because Historic was in that time at 400
1   |  50   # 500 - 450 for Actions#2 with time 12:30 because H. was in that time at 450
2   |  50   # 125 - 75  for Actions#3 with time 12:30 because H. was in that time at 75

如果不对庞大的数据集进行大量迭代,我就无法提出一个感觉合适的好的解决方案.我总是要考虑从 Historic 集合中创建一个范围,然后以某种方式检查 Actions 是否适合该范围,例如 (11:00 - 12:15) 来制作计算.但这对我来说似乎很慢.有没有更有效的方法来做到这一点?在我看来,这种问题可能很流行,但我还没有找到任何提示.你会如何在 spark 中解决这个问题?

I can't come up with a good solution that feels right, without making a lot of iterations over huge datasets. I always have to think about making a range from the Historic set and then somehow check if the Actions fits in the range e.g (11:00 - 12:15) to make the calculation. But that seems to pretty slow to me. Is there any more efficient way to do that? Seems to me, that this kind of problem could be popular, but i couldn't find any hints on this yet. How would you solve this problem in spark?

我目前的尝试(完成一半的代码)

My current attempts so far ( in half way done code )

case class Historic(id: String, set_at: Long, valueY: Int)
val historicRDD = sc.cassandraTable[Historic](...)

historicRDD
.map( row => ( row.id, row ) )
.reduceByKey(...) 
// transforming to another case which results in something like this; code not finished yet
// (List((Range(0, 12:25), 400), (Range(12:25, NOW), 450)))

// From here we could join with Actions
// And then some .filter maybe to select the right Lists tuple

推荐答案

这是一个有趣的问题.我也花了一些时间想出一种方法.这是我想出的:

It's an interesting problem. I also spent some time figuring out an approach. This is what I came up with:

Action(id, time, x)Historic(id, time, y)

  • 加入历史动作(这可能很重)
  • 过滤与给定操作无关的所有历史数据
  • 通过 (id,time) 对结果进行键入 - 在不同时间区分相同的键
  • 将历史按动作减少到最大值,为我们留下给定动作的相关历史记录

在 Spark 中:

val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) =>  (a1, if (h1.t>h2.t) h1 else h2)}

// we are done, let's produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}

使用上面提供的数据,报告如下所示:

Using the data provided above, the report looks like:

report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))

(我将时间转换为秒以获得简单的时间戳)

(I transformed the time to seconds to have a simplistic timestamp)

这篇关于Spark:如何按时间范围加入 RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆