spark:在另一个rdd内访问rdd [英] spark: access rdd inside another rdd

查看:213
本文介绍了spark:在另一个rdd内访问rdd的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个大小为6000的查找rdd,lookup_rdd:RDD [String]

I have a lookup rdd of size 6000, lookup_rdd: RDD[String]

a1 a2 a3 a4 a5 .....

a1 a2 a3 a4 a5 .....

和另一个rdd,data_rdd:RDD [(String,Iterable [(String,Int)])]:(id,(item,count))具有唯一的ID,

and another rdd, data_rdd: RDD[(String, Iterable[(String, Int)])]: (id,(item,count)) which has unique ids,

(id1,List((a1,2), (a3,4))) (id2,List((a2,1), (a4,2), (a1,1))) (id3,List((a5,1)))

(id1,List((a1,2), (a3,4))) (id2,List((a2,1), (a4,2), (a1,1))) (id3,List((a5,1)))

lookup_rdd中的FOREACH元素,我要检查每个id是否都具有该元素,如果有,我将其放入计数,如果不是,则将其放入0,然后存储在文件中.

FOREACH element in lookup_rdd I want to check whether each id has that element or not, if it is there I put the count and if it's not I put 0, and store in a file.

实现此目标的有效方法是什么.可能进行散列吗?例如.我想要的输出是:

What is the efficient way to achieve this. Is hashing possible? eg. output I want is:

id1,2,0,4,0,0 id2,1,1,0,2,0 id3,0,0,0,0,1

id1,2,0,4,0,0 id2,1,1,0,2,0 id3,0,0,0,0,1

我已经尝试过了:

val headers = lookup_rdd.zipWithIndex().persist()  
val indexing = data_rdd.map{line =>
  val id = line._1
  val item_cnt_list = line._2
  val arr = Array.fill[Byte](6000)(0)
  item_cnt_list.map(c=>(headers.lookup(c._1),c._2))
  }
indexing.collect().foreach(println)

我得到了例外:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations

推荐答案

坏消息是您不能在另一个内部使用RDD.

The bad news is that you cannot use an RDD within another.

好消息是,对于您的用例,假设6000个条目很小,那么有一个理想的解决方案:在驱动程序上收集RDD,将其广播回集群的每个节点,并在另一个节点中使用它像以前一样进行RDD.

The good news is that for your use case, assuming that the 6000 entries are fairly small, there is an ideal solution: collect the RDD on the driver, broadcast it back to each node of the cluster and use it within the other RDD as you did before.

val sc: SparkContext = ???
val headers = sc.broadcast(lookup_rdd.zipWithIndex.collect().toMap)
val indexing = data_rdd.map { case (_, item_cnt_list ) =>
  item_cnt_list.map { case (k, v) => (headers.value(k), v) }
}
indexing.collect().foreach(println)

这篇关于spark:在另一个rdd内访问rdd的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆