星火多个加盟 [英] Spark Multiple Joins

查看:151
本文介绍了星火多个加盟的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用火花的背景下,我想执行的多个连接
RDD的,其中待接合RDD的数量应是动态的。
我想结果被展开,例如:

Using spark context, I would like to perform multiple joins between rdd's, where the number of rdd's to be joined should be dynamic. I would like the result to be unfolded, for example:

val rdd1  = sc.parallelize(List((1,1.0),(11,11.0), (111,111.0)))
val rdd2  = sc.parallelize(List((1,2.0),(11,12.0), (111,112.0)))
val rdd3  = sc.parallelize(List((1,3.0),(11,13.0), (111,113.0)))
val rdd4  = sc.parallelize(List((1,4.0),(11,14.0), (111,114.0)))
val rdd11 = rdd1.join(rdd2).join(rdd3).join(rdd4)
.foreach(println)

生成以下的输出:

generates the following output:

(11,(((11.0,12.0),13.0),14.0))

(111,(((111.0,112.0),113.0),114.0))

(1,(((1.0,2.0),3.0),4.0))

我想:


  1. 展开值,例如第一行应为:
    (11,11.0,12.0,13.0,14.0)。

  1. Unfold the values, e.g first line should read: (11, 11.0, 12.0, 13.0, 14.0).

做到这一点动态,以便它可以在任何动态数工作
RDD的公司要加入。

Do it dynamically so that it can work on any dynamic number of rdd's to be joined.

任何想法,将AP preciated,

Any ideas would be appreciated,

礼。

推荐答案

加入,我会使用联盟其次 groupByKey 来实现你的愿望。

Instead of using join, I would use union followed by groupByKey to achieve what you desire.

下面是我会做什么 -

Here is what I would do -

val emptyRdd = sc.emptyRDD[(Int, Double)]
val listRdds = List(rdd1, rdd2, rdd3, rdd4) // satisfy your dynamic number of rdds
val unioned = listRdds.fold(emptyRdd)(_.union(_))
val grouped = unioned.groupByKey
grouped.collect().foreach(println(_))

这将产生的结果:

(1,CompactBuffer(1.0, 2.0, 3.0, 4.0))
(11,CompactBuffer(11.0, 12.0, 13.0, 14.0))
(111,CompactBuffer(111.0, 112.0, 113.0, 114.0))

更新

如果你仍然想使用加入,这是如何与有些复杂foldLeft功能做到这一点 -

If you would still like to use join, this is how to do it with somewhat complicated foldLeft functions -

val joined = rddList match {
  case head::tail => tail.foldLeft(head.mapValues(Array(_)))(_.join(_).mapValues {
    case (arr: Array[Double], d: Double) => arr :+ d
  })
  case Nil => sc.emptyRDD[(Int, Array[Double])]
}

joined.collect 将产生

res14: Array[(Int, Array[Double])] = Array((1,Array(1.0, 2.0, 3.0, 4.0)), (11,Array(11.0, 12.0, 13.0, 14.0)), (111,Array(111.0, 112.0, 113.0, 114.0)))

这篇关于星火多个加盟的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆