Apache Spark RDD 替换 [英] Apache Spark RDD substitution

查看:36
本文介绍了Apache Spark RDD 替换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试解决一个问题,这样我就有了这样的数据集:

I'm trying to solve a problem such that I've got a dataset like this:

(1, 3)
(1, 4)
(1, 7)
(1, 2)   <-
(2, 7)   <-
(6, 6)    
(3, 7)   <-
(7, 4)   <-
...

由于 (1 -> 2)(2 -> 7),我想替换集合 (2, 7)(1, 7)类似地,(3 -> 7)(7 -> 4) 也将 (7,4) 替换为 (3, 4)

Since (1 -> 2) and (2 -> 7), I would like to replace the set (2, 7) as (1, 7) similarly, (3 -> 7) and (7 -> 4) also replace (7,4) as (3, 4)

因此,我的数据集变成了

Hence, my dataset becomes

(1, 3)
(1, 4)
(1, 7)
(1, 2)  
(1, 7)  
(6, 6)    
(3, 7)
(3, 4)
...

知道如何解决或解决这个问题吗?

Any idea how to solve or tackle this ?

谢谢

推荐答案

这个问题看起来像一个图的传递闭包,以分布式边列表的形式表示.

This problem looks like a transitive closure of a graph, represented in the form of a distributed list of edges.

与旧版 Hadoop MR 相比,Spark 的主要功能之一是 Spark 支持交互式算法.为了解决这样的图遍历问题,我们在递归函数中利用了这种能力:

One of the key features of Spark, when compared to older Hadoop MR is that Spark supports interactive algorithms. To solve a graph traversal problem like this, we exploit that capability in a recursive function:

def closure(rdd:RDD[(Int, Int)]):RDD[(Int,Int)] = {
  val transitiveValues = rdd.map(_.swap).join(rdd).filter{case (_,(x,y)) => x != y}
  if (transitiveValues.isEmpty) {
    rdd
  } else {
    val usedTransitions = transitiveValues.flatMap{case (a,(x,y)) => Seq((x,a),(a,y))}
    val newTransitions = transitiveValues.map{case (a,(x,y)) => (x,y)}
    closure(rdd.subtract(usedTransitions).union(newTransitions)).distinct
  }
}

这并不完全导致上面预期的输出,因为没有优先级的概念(隐式排序),所以 closure((1, 2),(2, 7)) = (1,7) 而不是在 (1, 2), (1, 7) 中,正如上面预期的那样.可以以额外的复杂性为代价来添加排序.此外,它不支持循环图(带循环).

This does not exactly results in the output expected above, because there's no notion of precedence (implicit ordering), so closure((1, 2),(2, 7)) = (1,7) and not in (1, 2), (1, 7) as expected above. Ordering can be added at the cost of extra complexity. Also, it does not support cyclic graphs (with loops).

此算法应仅作为针对特定内部要求进行调整的起点.

This algorithm should serve only as starting point to be tuned to the specific internal requirements.

这篇关于Apache Spark RDD 替换的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆