递归使用sc.textFile子目录中获取文件内容 [英] Recursively fetch file contents from subdirectories using sc.textFile
问题描述
似乎SparkContext TEXTFILE预计只有文件是在特定目录位置present - 它也不
- (一)递归或
- (二)即使的支持的目录(尝试读取目录的文件)
任何建议如何构造一个递归 - 可能比手动创建递归文件列表/血统的逻辑更简单
下面是用例:文件在
/数据/表/ my_table的
块引用>我希望能够通过一个HDFS调用父目录下读取所有目录级别的所有文件。
更新
该sc.textFile()通过(子类)的TextInputFormat调用的Hadoop FileInputFormat。内部的逻辑确实存在做递归目录读取 - 即第一和检测,如果一个条目是一个目录这样的话降:
的(时间filestatus globStat:匹配){
如果218(globStat.isDir()){
219(时间filestatus统计:fs.listStatus(globStat.getPath()
220输入过滤)){
221 result.add(STAT);
222}
223}其他{
224 result.add(globStat);
225}
226}但是在调用sc.textFile当有目录条目的错误:不是文件。这种行为是混乱 - 给予适当的支持,似乎是在地方处理目录
解决方案我在寻找一个旧版本FileInputFormat ..
的在设置递归配置的 MA preduce.input.fileinputformat.input.dir.recursive
斯卡拉> sc.textFile(开发/ *)。算
java.io.IOException异常:不是一个文件:文件:/共享/ sparkup的/ dev /审计发布/ blank_maven_build默认为空/不设置这是假评价:
斯卡拉> sc.hadoopConfiguration.get(马preduce.input.fileinputformat.input.dir.recursive)
RES1:字符串=空AFTER:
现在设置的值:
<$p$p><$c$c>sc.hadoopConfiguration.set(\"ma$p$pduce.input.fileinputformat.input.dir.recursive\",\"true\")斯卡拉&GT; sc.hadoopConfiguration.get(马preduce.input.fileinputformat.input.dir.recursive)
RES4:字符串=真
现在重试递归操作:
Scala的方式&gt; sc.textFile(开发/ * / *)计算..
res5:龙= 3481因此,它的工作原理。更新添加的 / 的由@Ben每满注释递归
It seems that SparkContext textFile expects only files to be present in the given directory location - it does not either
- (a) recurse or
- (b) even support directories (tries to read directories as files)
Any suggestion how to structure a recursion - potentially simpler than creating the recursive file list / descent logic manually?
Here is the use case: files under
/data/tables/my_table
I want to be able to read via an hdfs call all the files at all directory levels under that parent directory.
UPDATE
The sc.textFile() invokes the Hadoop FileInputFormat via the (subclass) TextInputFormat. Inside the logic does exist to do the recursive directory reading - i.e. first detecting if an entry were a directory and if so then descending:
for (FileStatus globStat: matches) { 218 if (globStat.isDir()) { 219 for(FileStatus stat: fs.listStatus(globStat.getPath(), 220 inputFilter)) { 221 result.add(stat); 222 } 223 } else { 224 result.add(globStat); 225 } 226 }
However when invoking sc.textFile there are errors on directory entries: "not a file". This behavior is confusing - given the proper support appears to be in place for handling directories.
解决方案I was looking at an old version of FileInputFormat..
BEFORE setting the recursive config mapreduce.input.fileinputformat.input.dir.recursive
scala> sc.textFile("dev/*").count java.io.IOException: Not a file: file:/shared/sparkup/dev/audit-release/blank_maven_build
The default is null/not set which is evaluated as "false":
scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive") res1: String = null
AFTER:
Now set the value :
sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive","true") scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive") res4: String = true
Now retry the recursive operation:
scala>sc.textFile("dev/*/*").count .. res5: Long = 3481 So it works.
Update added / for full recursion per comment by @Ben
这篇关于递归使用sc.textFile子目录中获取文件内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!