spark:访问另一个 rdd 中的 rdd [英] spark: access rdd inside another rdd
问题描述
我有一个大小为 6000 的查找 rdd,lookup_rdd: RDD[String]
I have a lookup rdd of size 6000, lookup_rdd: RDD[String]
<代码>a1a2a3a4a5 .....
和另一个 rdd, data_rdd: RDD[(String, Iterable[(String, Int)])]: (id,(item,count)) 具有唯一的 id,
and another rdd, data_rdd: RDD[(String, Iterable[(String, Int)])]: (id,(item,count)) which has unique ids,
(id1,List((a1,2), (a3,4)))(id2,List((a2,1), (a4,2), (a1,1)))(id3,List((a5,1)))
lookup_rdd 中的 FOREACH 元素我想检查每个 id 是否有那个元素,如果有,我把计数放在那里,如果没有,我把 0 放在一个文件中.
FOREACH element in lookup_rdd I want to check whether each id has that element or not, if it is there I put the count and if it's not I put 0, and store in a file.
实现这一目标的有效方法是什么.哈希可能吗?例如.我想要的输出是:
What is the efficient way to achieve this. Is hashing possible? eg. output I want is:
id1,2,0,4,0,0id2,1,1,0,2,0id3,0,0,0,0,1
我已经试过了:
val headers = lookup_rdd.zipWithIndex().persist()
val indexing = data_rdd.map{line =>
val id = line._1
val item_cnt_list = line._2
val arr = Array.fill[Byte](6000)(0)
item_cnt_list.map(c=>(headers.lookup(c._1),c._2))
}
indexing.collect().foreach(println)
我收到异常:
org.apache.spark.SparkException:RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用
推荐答案
坏消息是你不能在另一个 RDD 中使用.
The bad news is that you cannot use an RDD within another.
好消息是,对于您的用例,假设 6000 个条目相当小,有一个理想的解决方案:在驱动程序上收集 RDD,将其广播回集群的每个节点并在其他节点中使用它RDD 和之前一样.
The good news is that for your use case, assuming that the 6000 entries are fairly small, there is an ideal solution: collect the RDD on the driver, broadcast it back to each node of the cluster and use it within the other RDD as you did before.
val sc: SparkContext = ???
val headers = sc.broadcast(lookup_rdd.zipWithIndex.collect().toMap)
val indexing = data_rdd.map { case (_, item_cnt_list ) =>
item_cnt_list.map { case (k, v) => (headers.value(k), v) }
}
indexing.collect().foreach(println)
这篇关于spark:访问另一个 rdd 中的 rdd的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!