如何使用RDD API反转reduceByKey的结果? [英] How to reverse the result of reduceByKey using RDD API?

查看:130
本文介绍了如何使用RDD API反转reduceByKey的结果?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将(key,value)的RDD转换为(key,List(value1,value2,value3)的RDD,如下所示.

I have a RDD of (key, value) that I transformed into a RDD of (key, List(value1, value2, value3) as follow.

val rddInit = sc.parallelize(List((1, 2), (1, 3), (2, 5), (2, 7), (3, 10)))
val rddReduced = rddInit..groupByKey.mapValues(_.toList)
rddReduced.take(3).foreach(println)

这段代码为我提供了下一个RDD: (1,List(2,3))(2,List(5,7))(3,List(10))

This code give me the next RDD : (1,List(2, 3)) (2,List(5, 7)) (3,List(10))

但是现在我想从我刚才计算的rdd(rddReduced rdd)返回rddInit.

But now I would like to go back to the rddInit from the rdd I just computed (the rddReduced rdd).

我的第一个猜测是实现键和List的每个元素之间的某种叉积,如下所示:

My first guess is to realise some kind of cross product between the key and each element of the List like this :

rddReduced.map{
  case (x, y) =>
    val myList:ListBuffer[(Int, Int)] = ListBuffer()
    for(element <- y) {
      myList+=new Pair(x, element)
    }
    myList.toList
}.flatMap(x => x).take(5).foreach(println)

使用此代码,我得到了初始的RDD.但是我不认为在火花作业中使用ListBuffer是一个好习惯.还有其他方法可以解决此问题吗?

With this code, I get the initial RDD as a result. But I don't think using a ListBuffer inside a spark job is a good practice. Is there any other way to resolve this problem ?

推荐答案

我很惊讶没有人提供Scala的 for-comprehension 解决方案(该解决方案使flatMap变成贬义",并且map在编译时).

I'm surprised no one has offered a solution with Scala's for-comprehension (that gets "desugared" to flatMap and map at compile time).

我不经常使用这种语法,但是当我...时,我发现它很有趣.有些人更喜欢理解,而不是一系列的flatMapmap.进行更复杂的转换.

I don't use this syntax very often, but when I do...I find it quite entertaining. Some people prefer for-comprehension over a series of flatMap and map, esp. for more complex transformations.

// that's what you ended up with after `groupByKey.mapValues`
val rddReduced: RDD[(Int, List[Int])] = ...
val r = for {
  (k, values) <- rddReduced
  v <- values
} yield (k, v)

scala> :type r
org.apache.spark.rdd.RDD[(Int, Int)]

scala> r.foreach(println)
(3,10)
(2,5)
(2,7)
(1,2)
(1,3)

// even nicer to our eyes
scala> r.toDF("key", "value").show
+---+-----+
|key|value|
+---+-----+
|  1|    2|
|  1|    3|
|  2|    5|
|  2|    7|
|  3|   10|
+---+-----+

毕竟,这就是我们享受Scala灵活性的原因,不是吗?

After all, that's why we enjoy flexibility of Scala, isn't it?

这篇关于如何使用RDD API反转reduceByKey的结果?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆