使用mapPartition和迭代器保存spark spark RDD [英] Save a spark RDD using mapPartition with iterator

查看:1297
本文介绍了使用mapPartition和迭代器保存spark spark RDD的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些中间数据需要存储在HDFS和本地数据中。我正在使用Spark 1.6。在HDFS中作为中间形式,我得到 / output / testDummy / part-00000 / output / testDummy / part-00001 。我想使用Java / Scala将这些分区保存在本地,以便我可以将它们保存为 /users/home/indexes/index.nt (通过在本地合并)或 /users/home/indexes/index-0000.nt /home/indexes/index-0001.nt 分开。

以下是我的代码:
注意:testDummy与test相同,输出是两个分区。我想将它们分开或合并,但使用 index.nt 文件进行本地存储。我更喜欢分开存储在两个数据节点中。我正在使用集群并在YARN上提交Spark任务。我还添加了一些评论,多少次以及我得到的数据。我怎么办?任何帮助表示赞赏。

  val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS +/ testDummy)
println(testDummy done)// 1次打印

def savesData(iterator:Iterator [(String)]):Iterator [(String)] = {
println( 内部savedData)//现在4次coalesce(Constants.INITIAL_PARTITIONS)= 2
println(iter size+ iterator.size)// 2 735 2 735 values
val filenamesWithExtension = outputPath + /index.nt
println(filenamesWithExtension+ filenamesWithExtension.length)// 4次
var list = List [(String)]()
$ b $ val fileWritter =新的FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)

while(iterator.hasNext){//iterator.hasNext false is
println(inside iterator ) // 0次
val dat = iterator.next()
println(datadata+ iterator.next())

bufferWritter.write(dat +\\\

bufferWritter.flush()
println(index files written)
$ b $ val dataElements = dat.split()
println(dataElements )// 0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2) ))
}
bufferWritter.close()//关闭
println(savedData方法结束)// 4次煤= 2
list.iterator
}
$ b println(将数据保存到本地之前)// 1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println(testRDD分区+ test.getNumPartitions)// 2
println(testRDD size+ test.collect()。length) // 0
println(将数据保存到本地后)// 1

PS :我跟着,,但不完全一样,我'我在做什么,但没有得到任何东西在 index.nt

解决方案

几件事:


  • 如果你不要调用 Iterator.size 计划稍后使用数据。 Iterators TraversableOnce 。计算 Iterator 大小的唯一方法是遍历其所有元素,之后不再有数据要读取。

  • Don对于副作用,使用 mapPartitions 之类的转换。如果您想执行某些类型的IO使用操作,例如 foreach / foreachPartition 。这是一个不好的做法,并不保证给定的代码片段只会被执行一次。

  • 动作或转换中的本地路径是特定worker的本地路径。如果你想直接在客户端机器上写,你应该首先使用 collect toLocalIterator 获取数据。稍后写入分布式存储并获取数据可能会更好。


I have some intermediate data that I need to be stored in HDFS and local as well. I'm using Spark 1.6. In HDFS as intermediate form I'm getting data in /output/testDummy/part-00000 and /output/testDummy/part-00001. I want to save these partitions in local using Java/Scala so that I could save them as /users/home/indexes/index.nt(by merging both in local) or /users/home/indexes/index-0000.nt and /home/indexes/index-0001.nt separately.

Here is my code: Note: testDummy is same as test, output is with two partitions. I want to store them separately or combined but local with index.nt file. I prefer to store separately in two data-nodes. I'm using cluster and submit spark job on YARN. I also added some comments, how many times and what data I'm getting. How could I do? Any help is appreciated.

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
 println("testDummy done")   //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
    println("Inside savesData")                                 //  now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
    println("iter size"+iterator.size)                           //  2 735 2 735 values
    val filenamesWithExtension = outputPath + "/index.nt"
    println("filenamesWithExtension "+filenamesWithExtension.length)   //4 times
    var list = List[(String)]()

    val fileWritter = new FileWriter(filenamesWithExtension,true)
    val bufferWritter = new BufferedWriter(fileWritter)

     while (iterator.hasNext){                       //iterator.hasNext is false
       println("inside iterator")                    //0 times 
       val dat = iterator.next()
       println("datadata "+iterator.next())

       bufferWritter.write(dat + "\n")
       bufferWritter.flush()
       println("index files written")

       val dataElements = dat.split(" ")
       println("dataElements")                                    //0
       list = list.::(dataElements(0))
       list = list.::(dataElements(1))
       list = list.::(dataElements(2))
     }
    bufferWritter.close() //closing
    println("savesData method end")                         //4 times when coal=2
    list.iterator
}

println("before saving data into local")                              //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions)                               //2
println("testRDD size "+test.collect().length)                                //0
println("after saving data into local")   //1

PS: I followed, this and this but not exactly same what I'm looking for, I did somehow but not getting anything in index.nt

解决方案

A couple of things:

  • Never call Iterator.size if you plan to use data later. Iterators are TraversableOnce. The only way to compute Iterator size is to traverse all its element and after that there is no more data to be read.
  • Don't use transformations like mapPartitions for side effects. If you want to perform some type of IO use actions like foreach / foreachPartition. It is a bad practice and doesn't guarantee that given piece of code will be executed only once.
  • Local path inside action or transformations is a local path of particular worker. If you want to write directly on the client machine you should fetch data first with collect or toLocalIterator. It could be better though to write to distributed storage and fetch data later.

这篇关于使用mapPartition和迭代器保存spark spark RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆