如何使用 Spark Scala 加入 3 个 RDD [英] How to JOIN 3 RDD's using Spark Scala

查看:31
本文介绍了如何使用 Spark Scala 加入 3 个 RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用 spark rdd 加入 3 个表.我使用 spark sql 实现了我的目标,但是当我尝试使用 Rdd 加入它时,我没有得到想要的结果.下面是我使用 spark SQLoutput 的查询:

I want to join 3 tables using spark rdd. I achieved my objective using spark sql but when I tried to join it using Rdd I am not getting the desired results. Below is my query using spark SQL and the output:

scala> actorDF.as("df1").join(movieCastDF.as("df2"),$"df1.act_id"===$"df2.act_id").join(movieDF.as("df3"),$"df2.mov_id"===$"df3.mov_id").
filter(col("df3.mov_title")==="Annie Hall").select($"df1.act_fname",$"df1.act_lname",$"df2.role").show(false)
+---------+---------+-----------+                                               
|act_fname|act_lname|role       |
+---------+---------+-----------+
|Woody    |Allen    |Alvy Singer|
+---------+---------+-----------+

现在我为三个数据集创建了pairedRDDs,如下所示:

Now I created the pairedRDDs for three datasets and it is as below :

scala> val actPairedRdd=actRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2),p(3))))

scala> actPairedRdd.take(5).foreach(println)

(101,(James,Stewart,M))
(102,(Deborah,Kerr,F))
(103,(Peter,OToole,M))
(104,(Robert,De Niro,M))
(105,(F. Murray,Abraham,M))

scala> val movieCastPairedRdd=movieCastRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2))))
movieCastPairedRdd: org.apache.spark.rdd.RDD[(String, (String, String))] = MapPartitionsRDD[318] at map at <console>:29

scala> movieCastPairedRdd.foreach(println)
(101,(901,John Scottie Ferguson))
(102,(902,Miss Giddens))
(103,(903,T.E. Lawrence))
(104,(904,Michael))
(105,(905,Antonio Salieri))
(106,(906,Rick Deckard))


scala> val moviePairedRdd=movieRdd.map(_.split("\t",-1)).map(p=>(p(0),(p(1),p(2),p(3),p(4),p(5),p(6))))
moviePairedRdd: org.apache.spark.rdd.RDD[(String, (String, String, String, String, String, String))] = MapPartitionsRDD[322] at map at <console>:29

scala> moviePairedRdd.take(2).foreach(println)
(901,(Vertigo,1958,128,English,1958-08-24,UK))
(902,(The Innocents,1961,100,English,1962-02-19,SW))  

这里 actPairedRddmovieCastPairedRdd 相互链接,movieCastPairedRddmoviePairedRdd 链接,因为它们有共同点列.
现在,当我加入所有三个数据集时,我没有得到任何数据

Here actPairedRdd and movieCastPairedRdd is linked with each other and movieCastPairedRdd and moviePairedRdd is linked since they have common column.
Now when I join all the three datasets I am not getting any data

scala> actPairedRdd.join(movieCastPairedRdd).join(moviePairedRdd).take(2).foreach(println)  

我收到空白记录.那么我哪里出错了??提前致谢

I am getting blank records. So where am I going wrong ?? Thanks in advance

推荐答案

像这样使用 RDD 的 JOIN 很痛苦,这也是为什么 DF 更好的另一个原因.

JOINs like this with RDDs are painful, that's another reason why DFs are nicer.

你没有得到数据,因为 RDD = K 对,V 没有最后一个 RDD 的 K 部分的公共数据.带有 101、102 的 K 将加入,但与 901、902 没有共性.你需要改变一些东西,像这样,我的更有限的例子:

You get no data as the pair RDD = K, V has no common data for the K part of the last RDD. The K's with 101, 102 will join, but there is no commonality with the 901, 902. You need to shift things around, like this, my more limited example:

val rdd1 = sc.parallelize(Seq(
           (101,("James","Stewart","M")),
           (102,("Deborah","Kerr","F")),
           (103,("Peter","OToole","M")),
           (104,("Robert","De Niro","M")) 
           ))

val rdd2 = sc.parallelize(Seq(
           (101,(901,"John Scottie Ferguson")),
           (102,(902,"Miss Giddens")),
           (103,(903,"T.E. Lawrence")),
           (104,(904,"Michael"))
           ))

val rdd3 = sc.parallelize(Seq(
          (901,("Vertigo",1958 )),
          (902,("The Innocents",1961)) 
          ))

val rdd4 = rdd1.join(rdd2)

val new_rdd4 = rdd4.keyBy(x => x._2._2._1)  // Redefine Key for join with rdd3
val rdd5 = rdd3.join(new_rdd4)
rdd5.collect

返回:

res14: Array[(Int, ((String, Int), (Int, ((String, String, String), (Int, String)))))] = Array((901,((Vertigo,1958),(101,((James,Stewart,M),(901,John Scottie Ferguson))))), (902,((The Innocents,1961),(102,((Deborah,Kerr,F),(902,Miss Giddens))))))

你需要通过地图剥离数据,我把它留给你.默认情况下内部联接.

You will need to strip out the data via a map, I leave that to you. INNER join per default.

这篇关于如何使用 Spark Scala 加入 3 个 RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆