如何使用Spark从目录递归读取Hadoop文件? [英] How to recursively read Hadoop files from directory using Spark?
问题描述
在给定目录中,我有许多不同的文件夹,在每个文件夹中,我都有Hadoop文件(part_001
等).
Inside the given directory I have many different folders and inside each folder I have Hadoop files (part_001
, etc.).
directory
-> folder1
-> part_001...
-> part_002...
-> folder2
-> part_001...
...
给出目录,我该如何递归地读取该目录中所有文件夹的内容,并使用Scala将这些内容加载到Spark中的单个RDD中?
Given the directory, how can I recursively read the content of all folders inside this directory and load this content into a single RDD in Spark using Scala?
我找到了这个,但是它没有递归地进入子文件夹(我正在使用import org.apache.hadoop.mapreduce.lib.input
):
I found this, but it does not recursively enters into sub-folders (I am using import org.apache.hadoop.mapreduce.lib.input
):
var job: Job = null
try {
job = Job.getInstance()
FileInputFormat.setInputPaths(job, new Path("s3n://" + bucketNameData + "/" + directoryS3))
FileInputFormat.setInputDirRecursive(job, true)
} catch {
case ioe: IOException => ioe.printStackTrace(); System.exit(1);
}
val sourceData = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).values
我还发现了网页使用SequenceFile
,但是我又不明白如何将其应用于我的案子?
I also found this web-page that uses SequenceFile
, but again I don't understand how to apply it to my case?
推荐答案
如果使用的是Spark,则可以使用通配符,如下所示:
If you are using Spark, you can do this using wilcards as follow:
scala>sc.textFile("path/*/*")
sc 是SparkContext,如果您使用的是spark-shell,则默认情况下会对其进行初始化,或者如果您正在创建自己的程序,则必须自己实例化一个SparkContext.
sc is the SparkContext which if you are using spark-shell is initialized by default or if you are creating your own program should will have to instance a SparkContext by yourself.
请注意以下标志:
scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
> res6: String = null
您应该将此标志设置为true:
Yo should set this flag to true:
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")
这篇关于如何使用Spark从目录递归读取Hadoop文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!