我如何在迭代阿帕奇火花RDD的(斯卡拉) [英] How do I iterate RDD's in apache spark (scala)

查看:95
本文介绍了我如何在迭代阿帕奇火花RDD的(斯卡拉)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我用下面的命令来填补RDD一束含2个字符串数组[文件名,内容。

现在我想遍历每个这些事件做每文件名和内容的东西。

  VAL someRDD = sc.wholeTextFiles(HDFS://本地主机:8020 /用户/ Cloudera的/ *)

我似乎无法找到。但是如何做到这一点的任何文档。

所以,我想是这样的:

 的foreach发生,在最RDD {
   //做的东西与在RDD的loccation N实测值数组
}


解决方案

您呼吁RDD接受函数作为参数的各种方法。

  //设置了一个例子
VAL sparkConf =新SparkConf()。setMaster(本地)。setAppName(示例)
VAL SC =新SparkContext(sparkConf)
VAL TESTDATA =阵列(阵列(1,2,3),阵列(4,5,6,7,8))
VAL testRDD = sc.parallelize(TESTDATA,2)//打印
。testRDD.collect()的foreach(A =>的println(a.size))//创建一个数组大小的RDD并打印
VAL countRDD = testRDD.map(一个= GT; a.size)
。countRDD.collect()的foreach(一个= GT;的println(a))的//创建一个RDD只用较长的阵列和打印每个数组
VAL bigRDD = testRDD.filter(一个= GT; a.size→3)
。bigRDD.collect()的foreach(一个= GT; {
    a.foreach(E =>打印(E +))
    的println()
  })
}

注意,你写的函数接受单个RDD元素作为输入,并返回一些统一类型的数据,因此在创建后一种类型的RDD。例如, countRDD RDD [INT] ,而 bigRDD 仍然是一个 RDD [数组[INT]]

这可能会在某些时候是很有诱惑力写的foreach 会修改一些其他的数据,但是你应该抗拒说明的原因的in~~V这个问题,并回答

编辑:不要试图打印大尺寸 RDD 取值

有许多读者询问关于使用收集()的println()来看看他们的结果,如上面的例子。当然,如果你像星火REPL交互模式运行时才能这样(读-EVAL-打印循环。)这是最好的叫收集()上在RDD得到有序印刷顺序排列。但收集()可携带过多的数据在任何情况下太多可以打印。这里有一些替代方式来深入了解你的 RDD ■如果他们是大的:


  1. RDD.take():这给你很好的控制在你的元素数量而不是它们来自哪里 - 定义为第一那些是被这里的各种其他问题和答案涉及的概念。

      //取()返回一个数组,所以没有必要收集()
    myHugeRDD.take(20).foreach(一个= GT;的println(a))的


  2. RDD.sample():这可以让你(大约)控制的结果你得到的分数,取样是否使用替代品,甚至可选择随机数种子。

      //样品()并返回一个RDD所以你仍然可能想收集()
    。myHugeRDD.sample(真,0.01).collect()的foreach(一个= GT;的println(a))的


  3. RDD.takeSample():这是一个混合型:使用您可以控制​​随机抽样,但都让你指定结果的准确数量和返回一个阵列

      // takeSample()返回一个数组,所以没有必要收集()
    myHugeRDD.takeSample(真,20).foreach(A =>的println(一))


  4. RDD.count():有时候最好的洞察力来自于有多少你结束了的元素 - 我经常这样做第一。

     的println(myHugeRDD.count())


I use the following command to fill an RDD with a bunch of arrays containing 2 strings ["filename", "content"].

Now I want to iterate over every of those occurrences to do something with every filename and content.

val someRDD = sc.wholeTextFiles("hdfs://localhost:8020/user/cloudera/*")

I can't seem to find any documentation on how to do this however.

So what I want is this:

foreach occurrence-in-the-rdd{
   //do stuff with the array found on loccation n of the RDD
} 

解决方案

You call various methods on the RDD that accept functions as parameters.

// set up an example
val sparkConf = new SparkConf().setMaster("local").setAppName("Example")
val sc = new SparkContext(sparkConf)
val testData = Array(Array(1,2,3), Array(4,5,6,7,8))
val testRDD = sc.parallelize(testData, 2)

// print it
testRDD.collect().foreach(a => println(a.size))

// create an RDD with the array sizes and print it
val countRDD = testRDD.map(a => a.size)
countRDD.collect().foreach(a => println(a))

// create an RDD with just the longer arrays and print each array
val bigRDD = testRDD.filter(a => a.size > 3)
bigRDD.collect().foreach(a => {
    a.foreach(e => print(e + " "))
    println()
  })
}

Notice that the functions you write accept a single RDD element as input, and return data of some uniform type, so you create an RDD of the latter type. For example, countRDD is an RDD[Int], while bigRDD is still an RDD[Array[Int]].

It will probably be tempting at some point to write a foreach that modifies some other data, but you should resist for reasons described in this question and answer.

Edit: Don't try to print large RDDs

Several readers have asked about using collect() and println() to see their results, as in the example above. Of course, this only works if you're running in an interactive mode like the Spark REPL (read-eval-print-loop.) It's best to call collect() on the RDD to get a sequential array for orderly printing. But collect() may bring back too much data and in any case too much may be printed. Here are some alternative ways to get insight into your RDDs if they're large:

  1. RDD.take(): This gives you fine control on the number of elements you get but not where they came from -- defined as the "first" ones which is a concept dealt with by various other questions and answers here.

    // take() returns an Array so no need to collect()
    myHugeRDD.take(20).foreach(a => println(a))
    

  2. RDD.sample(): This lets you (roughly) control the fraction of results you get, whether sampling uses replacement, and even optionally the random number seed.

    // sample() does return an RDD so you may still want to collect()
    myHugeRDD.sample(true, 0.01).collect().foreach(a => println(a))
    

  3. RDD.takeSample(): This is a hybrid: using random sampling that you can control, but both letting you specify the exact number of results and returning an Array.

    // takeSample() returns an Array so no need to collect() 
    myHugeRDD.takeSample(true, 20).foreach(a => println(a))
    

  4. RDD.count(): Sometimes the best insight comes from how many elements you ended up with -- I often do this first.

    println(myHugeRDD.count())       
    

这篇关于我如何在迭代阿帕奇火花RDD的(斯卡拉)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆