如何使Spark会话递归读取所有文件? [英] How to make Spark session read all the files recursively?

查看:68
本文介绍了如何使Spark会话递归读取所有文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

显示存储JSON文件的目录:

Displaying the directories under which JSON files are stored:

$ tree -d try/
try/
├── 10thOct_logs1
├── 11thOct
│   └── logs2
└── Oct
    └── 12th
        └── logs3

任务是使用 SparkSession 读取所有日志.

Task is to read all logs using SparkSession.

是否有一种优雅的方法可以依次读取目录中的所有文件然后递归子目录?

我尝试过的几个命令很容易导致意外排除.

Few commands that I tried are prone to cause unintentional exclusion.

spark.read.json("file:///var/foo/try/<exp>")

+----------+---+-----+-------+
| <exp> -> | * | */* | */*/* |
+----------+---+-----+-------+
| logs1    | y | y   | n     |
| logs2    | n | y   | y     |
| logs3    | n | n   | y     |
+----------+---+-----+-------+

您可以在上表中看到,三个表达式都没有一次与所有目录(位于3个不同深度)匹配.坦白地说,使用第三个表达式 */*/* 时,我并不期望排除 10thOct_logs1 .

You can see in the above table that none of the three expressions matches all the directories (located at 3 different depths) at the same time. Frankly speaking, I wasn't expecting the exclusion of 10thOct_logs1 while using the third expression */*/*.

这使我得出结论,将与最后一个/之后的表达式匹配的任何文件或目录路径都视为完全匹配,而其他所有内容都将被忽略.

This makes me conclude that whatever files or directories path match against the expression following last / is considered as an exact match, and everything else is ignored.

推荐答案

更新

Spark 3中引入了一个新选项,以从嵌套文件夹 recursiveFileLookup 中读取:

spark.read.option("recursiveFileLookup", "true").json("file:///var/foo/try")


对于较旧的版本,您也可以使用Hadoop listFiles 递归列出所有文件路径,然后将它们传递给Spark read:


For older versions, alternatively, you can use Hadoop listFiles to list recursively all the file paths and then pass them to Spark read:

import org.apache.hadoop.fs.{Path}

val conf = sc.hadoopConfiguration

// get all file paths
val fromFolder = new Path("file:///var/foo/try/")
val logfiles = fromFolder.getFileSystem(conf).listFiles(fromFolder, true)
var files = Seq[String]()
while (logfiles.hasNext) {
       // one can filter here some specific files
       files = files :+ logfiles.next().getPath().toString
}

// read multiple paths
val df = spark.read.csv(files: _*)

df.select(input_file_name()).distinct().show(false)


+-------------------------------------+
|input_file_name()                    |
+-------------------------------------+
|file:///var/foo/try/11thOct/log2.csv |
|file:///var/foo/try/10thOct_logs1.csv|
|file:///var/foo/try/Oct/12th/log3.csv|
+-------------------------------------+

这篇关于如何使Spark会话递归读取所有文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆