函数返回星火空List [英] Function returns an empty List in Spark
本文介绍了函数返回星火空List的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
下面是code为获得一个压缩文件的文件名列表
Below is code for getting list of file Names in a zipped file
def getListOfFilesInRepo(zipFileRDD : RDD[(String,PortableDataStream)]) : (List[String]) = {
val zipInputStream = zipFileRDD.values.map(x => new ZipInputStream(x.open))
val filesInZip = new ArrayBuffer[String]()
var ze : Option[ZipEntry] = None
zipInputStream.foreach(stream =>{
do{
ze = Option(stream.getNextEntry);
ze.foreach{ze =>
if(ze.getName.endsWith("java") && !ze.isDirectory()){
var fileName:String = ze.getName.substring(ze.getName.lastIndexOf("/")+1,ze.getName.indexOf(".java"))
filesInZip += fileName
}
}
stream.closeEntry()
} while(ze.isDefined)
println(filesInZip.toList.length) // print 889 (correct)
})
println(filesInZip.toList.length) // print 0 (WHY..?)
(filesInZip.toList)
}
我在下面的方式执行上面code:
I execute above code in the following manner :
scala> val zipFileRDD = sc.binaryFiles("./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip")
zipFileRDD: org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)] = ./handsOn/repo~apache~storm~14135470~false~Java~master~2210.zip BinaryFileRDD[17] at binaryFiles at <console>:25
scala> getListOfFilesInRepo(zipRDD)
889
0
res12: List[String] = List()
为什么我没有得到889,而是让0?
Why i am not getting 889 and instead getting 0?
推荐答案
这是因为 filesInZip
不是工人之间共享。 的foreach
运行在 filesInZip
的本地副本,当它完成这个副本简单地抛弃,垃圾收集。如果你想保留的结果,你应该使用变换(最有可能是 flatMap
),并返回收集汇总值。
It happens because filesInZip
is not shared between workers. foreach
operates on a local copy of filesInZip
and when it finishes this copy is simply discarded and garbage collected. If you want to keep the results you should use transformation (most likely a flatMap
) and return collected aggregated values.
def listFiles(stream: PortableDataStream): TraversableOnce[String] = ???
zipInputStream.flatMap(listFiles)
这篇关于函数返回星火空List的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文