如何使用RDD其他RDDS地图的方法? [英] How to use RDD in other RDDs map method?

查看:151
本文介绍了如何使用RDD其他RDDS地图的方法?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我得到了一个名为RDD指数:RDD [(字符串,字符串),我想用指数来处理我的文件。
这是code:

I got a rdd named index: RDD[(String, String)], I want to use index to deal with my file. This is the code:

val get = file.map({x =>
  val tmp = index.lookup(x).head
  tmp
})

现在的问题是,我不能在file.map函数中使用的索引,我跑这个程序,它给了我这样的反馈:

The question is that I can not use index in the file.map Function, I ran this program and it gave me feedback like this:

14/12/11 16:22:27 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 602, spark2): scala.MatchError: null
        org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:770)
        com.ynu.App$$anonfun$12.apply(App.scala:270)
        com.ynu.App$$anonfun$12.apply(App.scala:265)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        scala.collection.AbstractIterator.to(Iterator.scala:1157)
        scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
        scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
        org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
        org.apache.spark.rdd.RDD$$anonfun$28.apply(RDD.scala:1080)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)
        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)

我不知道为什么。如果我想实现这个功能,我该怎么办?
谢谢

I don't know why. And if I want to implement this function what can I do? Thanks

推荐答案

您应该看到RDDS虚拟集合。该RDD参考,仅指向那里的数据是,它本身没有数据,所以没有点在封闭使用它。

You should see RDDs as virtual collections. The RDD reference, only points to where the data is, in itself it has no data, so there's no point on using it in a closure.

您将需要使用,为了实现期望的功能结合在一起RDDS功能。另外,这里定义的查找是一个非常有序的过程,需要所有每个工人的可用内存查找数据 - 这不会扩展

You will need to use functions that combine RDDs together in order to achieve the desired functionality. Also, lookup as defined here is a very sequential process that requires all the lookup data available in the memory of each worker - this will not scale up.

要解决的所有元素的文件 RDD ,要他们在价值指数你应该加入这两个RDDS:

To resolve all elements of the file rdd that to their value in index you should join both RDDs:

val resolvedFileRDD = file.keyBy(identity).join(index) // this will have the form of (key, (key,index of key)) 

这篇关于如何使用RDD其他RDDS地图的方法?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆