星火:什么是加盟一个2元组键RDD用单键RDD最好的策略? [英] Spark: what's the best strategy for joining a 2-tuple-key RDD with single-key RDD?
问题描述
我有两个RDD的,我想加入,他们是这样的:
I have two RDD's that I want to join and they look like this:
val rdd1:RDD[(T,U)]
val rdd2:RDD[((T,W), V)]
这恰好是 RDD1集
的键值是唯一的情况下,也即的元组键值 RDD2
是唯一的。我想加入这两个数据集,因此我得到以下RDD:
It happens to be the case that the key values of rdd1
are unique and also that the tuple-key values of rdd2
are unique. I'd like to join the two data sets so that I get the following rdd:
val rdd_joined:RDD[((T,W), (U,V))]
什么是最有效的方式来实现这一目标?下面是我想过的一些想法。
What's the most efficient way to achieve this? Here are a few ideas I've thought of.
选项1:
val m = rdd1.collectAsMap
val rdd_joined = rdd2.map({case ((t,w), u) => ((t,w), u, m.get(t))})
选项2:
val distinct_w = rdd2.map({case ((t,w), u) => w}).distinct
val rdd_joined = rdd1.cartesian(distinct_w).join(rdd2)
选项1将收集的所有数据的掌握,对不对?所以这似乎不是一个很好的选择,如果RDD1集大(这是在我的情况比较大,虽然比RDD2小一个数量顺序)。选项2做一个丑陋的独特和笛卡尔积,这也似乎非常低效的。掠过我的头脑(但还没有尝试过),另一种可能性是做选择1和广播地图,虽然这将是更好的一个聪明的方式播出,使得地图的钥匙共同位于同的 RDD2
键。
有没有人碰到这种情况之前?我很高兴有你的看法。
Has anyone come across this sort of situation before? I'd be happy to have your thoughts.
谢谢!
推荐答案
一个选项是通过收集 RDD1集
来驱动,并广播到所有映射器进行广播加盟;做得正确,这将让我们避免大 RDD2
RDD的一个昂贵的洗牌:
One option is to perform a broadcast join by collecting rdd1
to the driver and broadcasting it to all mappers; done correctly, this will let us avoid an expensive shuffle of the large rdd2
RDD:
val rdd1 = sc.parallelize(Seq((1, "A"), (2, "B"), (3, "C")))
val rdd2 = sc.parallelize(Seq(((1, "Z"), 111), ((1, "ZZ"), 111), ((2, "Y"), 222), ((3, "X"), 333)))
val rdd1Broadcast = sc.broadcast(rdd1.collectAsMap())
val joined = rdd2.mapPartitions({ iter =>
val m = rdd1Broadcast.value
for {
((t, w), u) <- iter
if m.contains(t)
} yield ((t, w), (u, m.get(t).get))
}, preservesPartitioning = true)
的 preservesPartitioning = TRUE
告诉星火此映射函数不会修改 RDD2
的钥匙;这将允许星火,以避免重新分区 RDD2
该联接基于任何后续操作的(T,W)
关键的。
The preservesPartitioning = true
tells Spark that this map function doesn't modify the keys of rdd2
; this will allow Spark to avoid re-partitioning rdd2
for any subsequent operations that join based on the (t, w)
key.
因为它需要在驱动程序中的通信瓶颈,这可能广播是低效的。原则上,有可能向广播一条RDD到另一个而不涉及驱动器;我有一个原型的这一点,我想概括和增加的火花。
This broadcast could be inefficient since it involves a communications bottleneck at the driver. In principle, it's possible to broadcast one RDD to another without involving the driver; I have a prototype of this that I'd like to generalize and add to Spark.
另一个选择是重新映射 RDD2
的按键和使用Spark 加入
方法;这将涉及的全部洗牌 RDD2
(也可能是 RDD1集
)
Another option is to re-map the keys of rdd2
and use the Spark join
method; this will involve a full shuffle of rdd2
(and possibly rdd1
):
rdd1.join(rdd2.map {
case ((t, w), u) => (t, (w, u))
}).map {
case (t, (v, (w, u))) => ((t, w), (u, v))
}.collect()
在我的样本输入,这两种方法产生相同的结果:
On my sample input, both of these methods produce the same result:
res1: Array[((Int, java.lang.String), (Int, java.lang.String))] = Array(((1,Z),(111,A)), ((1,ZZ),(111,A)), ((2,Y),(222,B)), ((3,X),(333,C)))
第三个选择是重组 RDD2
让 T
是其关键,那么执行上述加盟。
A third option would be to restructure rdd2
so that t
is its key, then perform the above join.
这篇关于星火:什么是加盟一个2元组键RDD用单键RDD最好的策略?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!