如何将查找(广播)RDD(或数据集)访问到其他RDD映射函数中 [英] How to access lookup(broadcast) RDD(or dataset) into other RDD map function

查看:240
本文介绍了如何将查找(广播)RDD(或数据集)访问到其他RDD映射函数中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Spark和Scala的新手,刚刚开始学习...我正在CDH 5.1.3上使用Spark 1.0.0

I am new to spark and scala and just started learning ... I am using spark 1.0.0 on CDH 5.1.3

我收到了一个广播的rdd,名为dbTableKeyValueMap:RDD [(String,String)],我想使用dbTableKeyValueMap处理我的fileRDD(每行有300列以上)。这是代码:

I got a broadcasted rdd named dbTableKeyValueMap: RDD[(String, String)], I want to use dbTableKeyValueMap to deal with my fileRDD( each row has 300+ columns). This is the code:

val get = fileRDD.map({x =>
  val tmp = dbTableKeyValueMap.lookup(x)
  tmp
})

在本地运行此挂起和/或一段时间后出现错误:

Running this locally hangs and/or after sometime gives error :

scala.MatchError: null
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:571)

我可以理解,如果在另一个内部访问一个RDD会出现问题对于我来说,采用笛卡尔乘积是不可行的,因为文件RDD中的记录非常庞大(每行300列以上)...就像我使用分布式缓存在设置方法中加载dbTableKeyValueMap一样,在hadoop java mapreduce代码的MAP中使用,我想在火花映射中使用类似的方式...我无法找到简单的示例来引用类似的用例...
我想一遍遍fileRDD行并在每列 f上进行一些转换,分类,查找等或进一步处理...
或有其他方法可以将dbTableKeyValueMap用作scala集合,而不是使用火花RDD

I can understand accessing one RDD inside other will have issues, if locality and size of collection come into picture.. For me taking Cartesian product is not option as records in file RDD are huge(each row with 300+ columns) ... Just like I used distributed cache to load this dbTableKeyValueMap in setup method and use in MAP of hadoop java mapreduce code, I want to use similar way in spark map... I am not able to find simple example to refer similar usecase... One by one I want to iterate over fileRDD rows and do some transformation, beatifications, lookups etc. on "each column" for further processing... Or there is any other way where I can use dbTableKeyValueMap as scala collection instead of spark RDD

请帮助

推荐答案

谢谢....最简单的方法是将查找RDD转换为 scala集合,很不错!我可以使用任何RDD在转换内部访问它。...

Thanks.... easiest thing to do was to convert lookup RDD into "scala collection" and good to go!! I am able to access it inside transformations with any RDD....

val scalaMap = dbTableKeyValueMap.collectAsMap.toMap
val broadCastLookupMap = sc.broadcast(scalaMap)

val get = fileRDD.map({x =>
  val tmp = broadCastLookupMap.value.get(x).head
  tmp
})

这个简单的解决方案应该记录在早期学习者的某个地方。花了我一段时间才弄清楚...

This easy solution should be documented somewhere for early learners ..It took while for me to figure it out...

感谢您的帮助...

这篇关于如何将查找(广播)RDD(或数据集)访问到其他RDD映射函数中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆