如何通过键连接两个RDDS得到(字符串,字符串)的RDD? [英] How to join two RDDs by key to get RDD of (String, String)?

查看:873
本文介绍了如何通过键连接两个RDDS得到(字符串,字符串)的RDD?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在形式RDD [两对RDDS(字符串,mutable.HashSet [字符串]):

I have two paired rdds in the form RDD [(String, mutable.HashSet[String]):

例如:

rdd1: 332101231222, "320758, 320762, 320760, 320759, 320757, 320761"
rdd2: 332101231222, "220758, 220762, 220760, 220759, 220757, 220761"

我要基于共同键RDD1集和RDD2结合起来,使O / P应该是这样的:
332101231222 320758,320762,320760,320759,320757,320761 220758,220762,220760,220759,220757,220761

I want to combine rdd1 and rdd2 based on common keys, so o/p should be like: 332101231222 320758, 320762, 320760, 320759, 320757, 320761 220758, 220762, 220760, 220759, 220757, 220761

下面是我的code:

def cogroupTest (rdd1: RDD [(String, mutable.HashSet[String])], rdd2: RDD [(String, mutable.HashSet[String])] ): Unit =
{

val prods_per_user_co_grouped = (rdd1).cogroup(rdd2)

prods_per_user_co_grouped.map { case (key: String, (value1: mutable.HashSet[String], value2:  mutable.HashSet[String])) => {
 val combinedhs = value1 ++ value2
 val sstr =  combinedhs.mkString("\t")
 val keypadded = key + "\t"
  s"$keypadded$sstr"
}
}.saveAsTextFile("/scratch/rdds_joined/")

下面是我得到的,当我运行我的程序错误:

Here is the error that I get when I run the my program:

scala.MatchError:(32101231222,(CompactBuffer(集(320758,320762,320760,320759,320757,320761)),CompactBuffer(集(220758,220762,220760,220759,220757,220761))))(的类scala.Tuple2)

scala.MatchError: (32101231222,(CompactBuffer(Set(320758, 320762, 320760, 320759, 320757, 320761)),CompactBuffer(Set(220758, 220762, 220760, 220759, 220757, 220761)))) (of class scala.Tuple2)

任何帮助,这将是巨大的!

Any help with this will be great!

推荐答案

正如你可能会从名称按键猜协同组团体的意见。这意味着,你的情况你会得到:

As you might guess from the name cogroup groups observations by key. It means that in your case you get:

(String, (Iterable[mutable.HashSet[String]], Iterable[mutable.HashSet[String]]))

不是

(String, (mutable.HashSet[String], mutable.HashSet[String]))

当你看看你的错误是pretty清楚。如果您想对组合你应该使用加入方法。如果没有,你应该调整模式来匹配结构,你再使用这样的:

It is pretty clear when you take a look at the error you get. If you want to combine pairs you should use join method. If not you should adjust pattern to match structure you get and then use something like this:

val combinedhs = value1.reduce(_ ++ _) ++ value2.reduce(_ ++ _)

这篇关于如何通过键连接两个RDDS得到(字符串,字符串)的RDD?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆