一个比较RDD的子集 [英] Comparing Subsets of an RDD

查看:145
本文介绍了一个比较RDD的子集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种方式来比较明智的RDD的子集。

I’m looking for a way to compare subsets of an RDD intelligently.

可以说,我有一个RDD类型的键/值对(内部 - > T)。我最终需要说与键2所有值的键1的所有值比较和关键的3个值比较关键的5和重点7的价值观,我怎么会去有效地这样做呢?

Lets say I had an RDD with key/value pairs of type (Int->T). I eventually need to say "compare all values of key 1 with all values of key 2 and compare values of key 3 to the values of key 5 and key 7", how would I go about doing this efficiently?

目前我想这样做的方法是通过创建过滤RDDS的列表,然后使用RDD.cartesian()

The way I’m currently thinking of doing it is by creating a List of filtered RDDs and then using RDD.cartesian()

def filterSubset[T] = (b:Int, r:RDD[(Int, T)]) => r.filter{case(name, _) => name == b} 

Val keyPairs:(Int, Int) // all key pairs

Val rddPairs = keyPairs.map{

            case (a, b) =>

                filterSubset(a,r).cartesian(filterSubset(b,r))

        }

rddPairs.map{whatever I want to compare…}

然后,我会遍历列表并执行每对的RDDS的地图采集,我需要关系数据。

I would then iterate the list and perform a map on each of the RDDs of pairs to gather the relational data that I need.

我不能告诉这个想法是,它是否将是非常低效的设置甚至几百个地图作业,然后遍历它们。在这种情况下,将火花懒估值优化数据所有地图之间的洗牌?如果没有,可有人请推荐一个可能更有效的方式来处理这个问题?

What I can’t tell about this idea is whether it would be extremely inefficient to set up possibly of hundreds of map jobs and then iterate through them. In this case, would the lazy valuation in spark optimize the data shuffling between all of the maps? If not, can someone please recommend a possibly more efficient way to approach this issue?

感谢您的帮助。

推荐答案

可以解决这个问题的方法之一是复制和分区的数据来反映要比较密钥对。让我们开始创建从实际键两张地图的临时密钥我们将使用复制和联接:

One way you can approach this problem is to replicate and partition your data to reflect key pairs you want to compare. Lets start with creating two maps from the actual keys to the temporary keys we'll use for replication and joins:

def genMap(keys: Seq[Int]) = keys
  .zipWithIndex.groupBy(_._1)
  .map{case (k, vs) => (k -> vs.map(_._2))}

val left = genMap(keyPairs.map(_._1))
val right = genMap(keyPairs.map(_._2))

接下来,我们可以通过新密钥复制变换数据:

Next we can transform data by replicating with new keys:

def mapAndReplicate[T: ClassTag](rdd: RDD[(Int, T)], map: Map[Int, Seq[Int]]) = {
  rdd.flatMap{case (k, v) => map.getOrElse(k, Seq()).map(x => (x, (k, v)))}  
}

val leftRDD = mapAndReplicate(rddPairs, left)
val rightRDD = mapAndReplicate(rddPairs, right)

最后,我们可以协同组:

Finally we can cogroup:

val cogrouped = leftRDD.cogroup(rightRDD)

和比较/过滤器对:

cogrouped.values.flatMap{case (xs, ys) => for {
  (kx, vx) <- xs
  (ky, vy) <- ys
  if cosineSimilarity(vx, vy) <= threshold
} yield ((kx, vx), (ky, vy)) }

显然,在目前的形式这种做法是有限的。它假定为任意对密钥值可以放入存储器和需要网络流量显著量。它仍然应该给你一些想法如何继续。

Obviously in the current form this approach is limited. It assumes that values for arbitrary pair of keys can fit into memory and require a significant amount of network traffic. Still it should give you some idea how to proceed.

另一种可能的方法是将存储在外部系统(例如数据库)的数据,并按需获取所需键 - 值对。

Another possible approach is to store data in the external system (for example database) and fetch required key-value pairs on demand.

既然你试图找到元素之间的相似性我也会考虑完全不同的方法。而不是天真地比较关键逐键我会尝试使用它反映文档之间的相似性预期自定义分区分区的数据。它是远离琐碎一般,但应给予更好的结果。

Since you're trying to find similarity between elements I would also consider completely different approach. Instead of naively comparing key-by-key I would try to partition data using custom partitioner which reflects expected similarity between documents. It is far from trivial in general but should give much better results.

这篇关于一个比较RDD的子集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆