使用mapPartition和迭代器保存spark spark RDD [英] Save a spark RDD using mapPartition with iterator
问题描述
我有一些中间数据需要存储在HDFS和本地数据中。我正在使用Spark 1.6。在HDFS中作为中间形式,我得到 / output / testDummy / part-00000
和 / output / testDummy / part-00001
。我想使用Java / Scala将这些分区保存在本地,以便我可以将它们保存为 /users/home/indexes/index.nt
(通过在本地合并)或 /users/home/indexes/index-0000.nt
和 /home/indexes/index-0001.nt
分开。
以下是我的代码:
注意:testDummy与test相同,输出是两个分区。我想将它们分开或合并,但使用 index.nt
文件进行本地存储。我更喜欢分开存储在两个数据节点中。我正在使用集群并在YARN上提交Spark任务。我还添加了一些评论,多少次以及我得到的数据。我怎么办?任何帮助表示赞赏。
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS +/ testDummy)
println(testDummy done)// 1次打印
def savesData(iterator:Iterator [(String)]):Iterator [(String)] = {
println( 内部savedData)//现在4次coalesce(Constants.INITIAL_PARTITIONS)= 2
println(iter size+ iterator.size)// 2 735 2 735 values
val filenamesWithExtension = outputPath + /index.nt
println(filenamesWithExtension+ filenamesWithExtension.length)// 4次
var list = List [(String)]()
$ b $ val fileWritter =新的FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while(iterator.hasNext){//iterator.hasNext false is
println(inside iterator ) // 0次
val dat = iterator.next()
println(datadata+ iterator.next())
bufferWritter.write(dat +\\\
)
bufferWritter.flush()
println(index files written)
$ b $ val dataElements = dat.split()
println(dataElements )// 0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2) ))
}
bufferWritter.close()//关闭
println(savedData方法结束)// 4次煤= 2
list.iterator
}
$ b println(将数据保存到本地之前)// 1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println(testRDD分区+ test.getNumPartitions)// 2
println(testRDD size+ test.collect()。length) // 0
println(将数据保存到本地后)// 1
PS :我跟着,此和这,但不完全一样,我'我在做什么,但没有得到任何东西在 index.nt
几件事:
Iterator.size
计划稍后使用数据。 Iterators
是 TraversableOnce
。计算 Iterator
大小的唯一方法是遍历其所有元素,之后不再有数据要读取。
mapPartitions
之类的转换。如果您想执行某些类型的IO使用操作,例如 foreach
/ foreachPartition
。这是一个不好的做法,并不保证给定的代码片段只会被执行一次。
collect
或 toLocalIterator
获取数据。稍后写入分布式存储并获取数据可能会更好。
I have some intermediate data that I need to be stored in HDFS and local as well. I'm using Spark 1.6. In HDFS as intermediate form I'm getting data in /output/testDummy/part-00000
and /output/testDummy/part-00001
. I want to save these partitions in local using Java/Scala so that I could save them as /users/home/indexes/index.nt
(by merging both in local) or /users/home/indexes/index-0000.nt
and /home/indexes/index-0001.nt
separately.
Here is my code:
Note: testDummy is same as test, output is with two partitions. I want to store them separately or combined but local with index.nt
file. I prefer to store separately in two data-nodes. I'm using cluster and submit spark job on YARN. I also added some comments, how many times and what data I'm getting. How could I do? Any help is appreciated.
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print
def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()
val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())
bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")
val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}
println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1
PS: I followed, this and this but not exactly same what I'm looking for, I did somehow but not getting anything in index.nt
A couple of things:
- Never call
Iterator.size
if you plan to use data later.Iterators
areTraversableOnce
. The only way to computeIterator
size is to traverse all its element and after that there is no more data to be read. - Don't use transformations like
mapPartitions
for side effects. If you want to perform some type of IO use actions likeforeach
/foreachPartition
. It is a bad practice and doesn't guarantee that given piece of code will be executed only once. - Local path inside action or transformations is a local path of particular worker. If you want to write directly on the client machine you should fetch data first with
collect
ortoLocalIterator
. It could be better though to write to distributed storage and fetch data later.
这篇关于使用mapPartition和迭代器保存spark spark RDD的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!