如何使用Spark从目录递归读取Hadoop文件? [英] How to recursively read Hadoop files from directory using Spark?

查看:583
本文介绍了如何使用Spark从目录递归读取Hadoop文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在给定目录中,我有许多不同的文件夹,在每个文件夹中,我都有Hadoop文件(part_001等).

Inside the given directory I have many different folders and inside each folder I have Hadoop files (part_001, etc.).

directory
   -> folder1
      -> part_001...
      -> part_002...
   -> folder2
      -> part_001...
   ...

给出目录,我该如何递归地读取该目录中所有文件夹的内容,并使用Scala将这些内容加载到Spark中的单个RDD中?

Given the directory, how can I recursively read the content of all folders inside this directory and load this content into a single RDD in Spark using Scala?

我找到了这个,但是它没有递归地进入子文件夹(我正在使用import org.apache.hadoop.mapreduce.lib.input):

I found this, but it does not recursively enters into sub-folders (I am using import org.apache.hadoop.mapreduce.lib.input):

  var job: Job = null
  try {
    job = Job.getInstance()
    FileInputFormat.setInputPaths(job, new Path("s3n://" + bucketNameData + "/" + directoryS3))
    FileInputFormat.setInputDirRecursive(job, true)
  } catch {
    case ioe: IOException => ioe.printStackTrace(); System.exit(1);
  }
  val sourceData = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).values

我还发现了网页使用SequenceFile,但是我又不明白如何将其应用于我的案子?

I also found this web-page that uses SequenceFile, but again I don't understand how to apply it to my case?

推荐答案

如果使用的是Spark,则可以使用通配符,如下所示:

If you are using Spark, you can do this using wilcards as follow:

scala>sc.textFile("path/*/*")

sc 是SparkContext,如果您使用的是spark-shell,则默认情况下会对其进行初始化,或者如果您正在创建自己的程序,则必须自己实例化一个SparkContext.

sc is the SparkContext which if you are using spark-shell is initialized by default or if you are creating your own program should will have to instance a SparkContext by yourself.

请注意以下标志:

scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive") 
> res6: String = null

您应该将此标志设置为true:

Yo should set this flag to true:

sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true")

这篇关于如何使用Spark从目录递归读取Hadoop文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆