如何通过键或过滤器()与两个RDD使用火花相交()? [英] how to use spark intersection() by key or filter() with two RDD?
问题描述
我想在spark中使用 intersection()
或者使用 filter()
。 b
$ b
但是我真的不知道如何使用 intersection()
按键。
所以我试着用 filter()
,但是不行。
$ b 示例 - 这里是两个RDD:
data1 // RDD [(String,Int)] = Array((a,1),( (a,b,2),(b,3),(c,1))
data2 // RDD [(String,Int)] = Array( a,3),(b,5))
val data3 = data2.map {_._ 1}
data1.filter {_._ 1 = = data3} .collect // Array [(String,Int] = Array()
根据 data2
所具有的关键字,使用与 data1
相同的关键字来获取(键,值)数组((a,1),(a,2),(b,2),(b, 3))是我想要的结果。
有没有一种方法可以解决这个问题,使用 intersection() / code>按键或
filter()
?
我尝试改善您的解决方案,在 filter()
中使用 broadcast
val data1 = sc.parallelize(Seq((a,1),(a,2), (a,3),(b,2),(b,3),(c,1)))
val data2 = sc.parallelize 5)))
//在执行程序节点中运行的过滤方法中使用广播data2键列表
val bcast = sc.broadcast(data2.map(_._ 1).collect ())
val result = data1.filter(r => bcast.value.contains(r._1))
println(result.collect()。toList)
//输出
List((a,1 ),(a,2),(b,2),(b,3))
< ())
<$ ((a,1),(a,2),(b,2),(b,3) ,(c,1)))
val data2 = sc.parallelize(Seq((a,3),(b,5)))
val cogroupRdd :RDD [(String,(Iterable [Int],Iterable [Int]))] = data1.cogroup(data2)
/ * List(
(a,(CompactBuffer(1,2),CompactBuffer (3))),
(b,(CompactBuffer(2,3),CompactBuffer(5))),
(c,(CompactBuffer(1),CompactBuffer()))
)* /
//现在筛选具有两个非空CompactBuffer的键。您也可以使用
// filter(row => row._2._1.nonEmpty&&row._2._2.nonEmpty)来做到这一点。
val filterRdd = cogroupRdd.filter {case(k,(v1,v2))=> v1.nonEmpty&& (CompactBuffer(1,2),CompactBuffer(3))),
(*,(CompactBuffer(2,3),CompactBuffer (5)))
)* /
//因为我们只关心第一个数据,所以我们只需要选择第一个压缩缓冲区
// v1.map(val1 = >(k,val1))
val result = filterRdd.flatMap {case(k,(v1,v2))=> (b,2),(b,3))$ b $(b1,b2) b
Edit2:
<$ p $()。code> val resultRdd = data1.join(data2).map(r =>(r._1,r._2._1))。distinct()
// List( (b,3),(a,2),(a,1))
< (内部连接)
(a,(1,3)),(a,(2,3)),(b,(2,5)),(b,( 2,1)),(b,(3,5)),(b,(3,1)))
I want to use intersection()
by key or filter()
in spark.
But I really don't know how to use intersection()
by key.
So I tried to use filter()
, but it's not worked.
example - here is two RDD:
data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))
val data3 = data2.map{_._1}
data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()
I want to get a (key, value) pair with the same key as data1
based on the key that data2
has.
Array(("a", 1), ("a", 2), ("b", 2), ("b", 3))
is the result I want.
Is there a method to solve this problem using intersection()
by key or filter()
?
I tried to improve your solution with broadcast
variable in filter()
val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
// broadcast data2 key list to use in filter method, which runs in executor nodes
val bcast = sc.broadcast(data2.map(_._1).collect())
val result = data1.filter(r => bcast.value.contains(r._1))
println(result.collect().toList)
//Output
List((a,1), (a,2), (b,2), (b,3))
Edit1: (As per comment to address scalability with-out using collect()
)
val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
(a, (CompactBuffer(1, 2), CompactBuffer(3))),
(b, (CompactBuffer(2, 3), CompactBuffer(5))),
(c, (CompactBuffer(1), CompactBuffer()))
) */
//Now filter keys which have two non empty CompactBuffer. You can do that with
//filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also.
val filterRdd = cogroupRdd.filter { case (k, (v1, v2)) => v1.nonEmpty && v2.nonEmpty }
/* List(
(a, (CompactBuffer(1, 2), CompactBuffer(3))),
(b, (CompactBuffer(2, 3), CompactBuffer(5)))
) */
//As we care about first data only, lets pick first compact buffer only
// by doing v1.map(val1 => (k, val1))
val result = filterRdd.flatMap { case (k, (v1, v2)) => v1.map(val1 => (k, val1)) }
//List((a, 1), (a, 2), (b, 2), (b, 3))
Edit2:
val resultRdd = data1.join(data2).map(r => (r._1, r._2._1)).distinct()
//List((b,2), (b,3), (a,2), (a,1))
Here data1.join(data2)
holds pairs with common keys (inner join)
//List((a,(1,3)), (a,(2,3)), (b,(2,5)), (b,(2,1)), (b,(3,5)), (b,(3,1)))
这篇关于如何通过键或过滤器()与两个RDD使用火花相交()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!