如何通过键连接两个RDDS得到(字符串,字符串)的RDD? [英] How to join two RDDs by key to get RDD of (String, String)?
问题描述
我在形式RDD [两对RDDS(字符串,mutable.HashSet [字符串]):
I have two paired rdds in the form RDD [(String, mutable.HashSet[String]):
例如:
rdd1: 332101231222, "320758, 320762, 320760, 320759, 320757, 320761"
rdd2: 332101231222, "220758, 220762, 220760, 220759, 220757, 220761"
我要基于共同键RDD1集和RDD2结合起来,使O / P应该是这样的:
332101231222 320758,320762,320760,320759,320757,320761 220758,220762,220760,220759,220757,220761
I want to combine rdd1 and rdd2 based on common keys, so o/p should be like: 332101231222 320758, 320762, 320760, 320759, 320757, 320761 220758, 220762, 220760, 220759, 220757, 220761
下面是我的code:
def cogroupTest (rdd1: RDD [(String, mutable.HashSet[String])], rdd2: RDD [(String, mutable.HashSet[String])] ): Unit =
{
val prods_per_user_co_grouped = (rdd1).cogroup(rdd2)
prods_per_user_co_grouped.map { case (key: String, (value1: mutable.HashSet[String], value2: mutable.HashSet[String])) => {
val combinedhs = value1 ++ value2
val sstr = combinedhs.mkString("\t")
val keypadded = key + "\t"
s"$keypadded$sstr"
}
}.saveAsTextFile("/scratch/rdds_joined/")
下面是我得到的,当我运行我的程序错误:
Here is the error that I get when I run the my program:
scala.MatchError:(32101231222,(CompactBuffer(集(320758,320762,320760,320759,320757,320761)),CompactBuffer(集(220758,220762,220760,220759,220757,220761))))(的类scala.Tuple2)
scala.MatchError: (32101231222,(CompactBuffer(Set(320758, 320762, 320760, 320759, 320757, 320761)),CompactBuffer(Set(220758, 220762, 220760, 220759, 220757, 220761)))) (of class scala.Tuple2)
任何帮助,这将是巨大的!
Any help with this will be great!
推荐答案
正如你可能会从名称按键猜协同组
团体的意见。这意味着,你的情况你会得到:
As you might guess from the name cogroup
groups observations by key. It means that in your case you get:
(String, (Iterable[mutable.HashSet[String]], Iterable[mutable.HashSet[String]]))
不是
(String, (mutable.HashSet[String], mutable.HashSet[String]))
当你看看你的错误是pretty清楚。如果您想对组合你应该使用加入
方法。如果没有,你应该调整模式来匹配结构,你再使用这样的:
It is pretty clear when you take a look at the error you get. If you want to combine pairs you should use join
method. If not you should adjust pattern to match structure you get and then use something like this:
val combinedhs = value1.reduce(_ ++ _) ++ value2.reduce(_ ++ _)
这篇关于如何通过键连接两个RDDS得到(字符串,字符串)的RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!