如何通过键或过滤器()与两个RDD使用火花相交()? [英] how to use spark intersection() by key or filter() with two RDD?

查看:219
本文介绍了如何通过键或过滤器()与两个RDD使用火花相交()?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在spark中使用 intersection()或者使用 filter()。 b
$ b

但是我真的不知道如何使用 intersection()按键。



所以我试着用 filter(),但是不行。
$ b 示例 - 这里是两个RDD:

  data1 // RDD [(String,Int)] = Array((a,1),( (a,b,2),(b,3),(c,1))
data2 // RDD [(String,Int)] = Array( a,3),(b,5))

val data3 = data2.map {_._ 1}

data1.filter {_._ 1 = = data3} .collect // Array [(String,Int] = Array()

根据 data2 所具有的关键字,使用与 data1 相同的关键字来获取(键,值)数组((a,1),(a,2),(b,2),(b, 3))是我想要的结果。

有没有一种方法可以解决这个问题,使用 intersection() / code>按键或 filter()

解决方案

我尝试改善您的解决方案,在 filter()中使用 broadcast

  val data1 = sc.parallelize(Seq((a,1),(a,2), (a,3),(b,2),(b,3),(c,1)))
val data2 = sc.parallelize 5)))

//在执行程序节点中运行的过滤方法中使用广播data2键列表
val bcast = sc.broadcast(data2.map(_._ 1).collect ())

val result = data1.filter(r => bcast.value.contains(r._1))


println(result.collect()。toList)
//输出
List((a,1 ),(a,2),(b,2),(b,3))

< ())

<$ ((a,1),(a,2),(b,2),(b,3) ,(c,1)))
val data2 = sc.parallelize(Seq((a,3),(b,5)))

val cogroupRdd :RDD [(String,(Iterable [Int],Iterable [Int]))] = data1.cogroup(data2)
/ * List(
(a,(CompactBuffer(1,2),CompactBuffer (3))),
(b,(CompactBuffer(2,3),CompactBuffer(5))),
(c,(CompactBuffer(1),CompactBuffer()))
)* /

//现在筛选具有两个非空CompactBuffer的键。您也可以使用
// filter(row => row._2._1.nonEmpty&&row._2._2.nonEmpty)来做到这一点。
val filterRdd = cogroupRdd.filter {case(k,(v1,v2))=> v1.nonEmpty&& (CompactBuffer(1,2),CompactBuffer(3))),
(*,(CompactBuffer(2,3),CompactBuffer (5)))
)* /

//因为我们只关心第一个数据,所以我们只需要选择第一个压缩缓冲区
// v1.map(val1 = >(k,val1))
val result = filterRdd.flatMap {case(k,(v1,v2))=> (b,2),(b,3))$ b $(b1,b2) b

Edit2:

<$ p $()。code> val resultRdd = data1.join(data2).map(r =>(r._1,r._2._1))。distinct()
// List( (b,3),(a,2),(a,1))



< (内部连接

(a,(1,3)),(a,(2,3)),(b,(2,5)),(b,( 2,1)),(b,(3,5)),(b,(3,1)))


I want to use intersection() by key or filter() in spark.

But I really don't know how to use intersection() by key.

So I tried to use filter(), but it's not worked.

example - here is two RDD:

data1 //RDD[(String, Int)] = Array(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1))
data2 //RDD[(String, Int)] = Array(("a", 3), ("b", 5))

val data3 = data2.map{_._1}

data1.filter{_._1 == data3}.collect //Array[(String, Int] = Array()

I want to get a (key, value) pair with the same key as data1 based on the key that data2 has.

Array(("a", 1), ("a", 2), ("b", 2), ("b", 3)) is the result I want.

Is there a method to solve this problem using intersection() by key or filter()?

解决方案

I tried to improve your solution with broadcast variable in filter()

val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))

// broadcast data2 key list to use in filter method, which runs in executor nodes
val bcast = sc.broadcast(data2.map(_._1).collect())

val result = data1.filter(r => bcast.value.contains(r._1))


println(result.collect().toList)
//Output
List((a,1), (a,2), (b,2), (b,3))

Edit1: (As per comment to address scalability with-out using collect())

val data1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))

val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
  (a, (CompactBuffer(1, 2), CompactBuffer(3))),
  (b, (CompactBuffer(2, 3), CompactBuffer(5))),
  (c, (CompactBuffer(1), CompactBuffer()))
) */

//Now filter keys which have two non empty CompactBuffer. You can do that with 
//filter(row => row._2._1.nonEmpty && row._2._2.nonEmpty) also. 
val filterRdd = cogroupRdd.filter { case (k, (v1, v2)) => v1.nonEmpty && v2.nonEmpty } 
/* List(
  (a, (CompactBuffer(1, 2), CompactBuffer(3))),
  (b, (CompactBuffer(2, 3), CompactBuffer(5)))
) */

//As we care about first data only, lets pick first compact buffer only 
// by doing v1.map(val1 => (k, val1))
val result = filterRdd.flatMap { case (k, (v1, v2)) => v1.map(val1 => (k, val1)) }
//List((a, 1), (a, 2), (b, 2), (b, 3))

Edit2:

val resultRdd = data1.join(data2).map(r => (r._1, r._2._1)).distinct()
//List((b,2), (b,3), (a,2), (a,1)) 

Here data1.join(data2) holds pairs with common keys (inner join)

//List((a,(1,3)), (a,(2,3)), (b,(2,5)), (b,(2,1)), (b,(3,5)), (b,(3,1)))

这篇关于如何通过键或过滤器()与两个RDD使用火花相交()?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆