检查文件是否存在于 HDFS 路径中? [英] Check if file exists in HDFS path?
问题描述
如何在给定某个基本路径的情况下检查文件是否存在.我正在为该方法提供文件列表,例如:file1.snappy, file2,snappy,...
How can I check if a file exists given a certain base path. I am providing the method a file list for example: file1.snappy, file2,snappy,...
我需要检查文件是否存在于任一给定路径中,例如:hdfs://a/b/c/source/file1.snappy
或文件是否存在于 hdfs://a/b/c/target/file1.snappy
.如何更新/修改下面的方法以接受 /a/b/c/target/
或 /a/b/c/source/
作为基本路径并检查是否文件已存在?如果在源中存在,则添加到源列表中,如果在目标中,则添加到目标列表中.
I need to check if file exists in either of the given paths for example: hdfs://a/b/c/source/file1.snappy
or if file exists in hdfs://a/b/c/target/file1.snappy
. How can I update/modify the method below to accept /a/b/c/target/
or /a/b/c/source/
as a basepath and check if file exists? If it exists in source add to a sourceList and if it is in destination add to a destination list.
val fs = FileSystem.get(sprk.sparkContext.hadoopConfiguration)
def fileExists(fileList:Array[String]) : Boolean = {
var fileNotFound = 0
fileList.foreach{
file => {
if(!fs.exists(new Path(file))) fileNotFound+=1
print("fileList",file)
}
}
if(fileNotFound > 0) {
println(fileNotFound + ": number of files not found probably moved")
false
}
else
true
}
PS:亲爱的读者,如果这有帮助,请为我的问题点赞.它会帮助我:)
PS: Dear readers please upvote my question if this helped. It would help me :)
推荐答案
我有一个源目录和目标如下例
I have a source dir and target are like this below example
试试这种方式进行递归查找
try this way for recursive lookup
URI.create(... )
在处理 s3 对象时非常有用(也适用于 hdfs/本地 fs)
URI.create(... )
is very imp when you are dealing with s3 objects (will also works with hdfs / local fs)
import java.net.URI
import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
* getAllFiles - get all files recursively from the sub folders.
*
* @param path String
* @param sc SparkContext
* @return Seq[String]
*/
def getAllFiles(path: String, sc: SparkContext): Seq[String] = {
val conf = sc.hadoopConfiguration
val fs = FileSystem.get(URI.create(path), conf)
val files: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(path), true) // true for recursive lookup
val buf = new ArrayBuffer[String]
while (files.hasNext()) {
val fileStatus = files.next();
buf.append(fileStatus.getPath().toString)
}
buf.toSeq
}
示例用法:
val spark: SparkSession = SparkSession.builder.appName(getClass.getName)
.master("local[*]").getOrCreate
val sc = spark.sparkContext
val myfiles: Seq[String] = getAllFiles("data/test_table", sc)
myfiles.foreach(println)
println(myfiles.contains("/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet"))
结果:
/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy.parquet
/data/test_table/target/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1111.parquet
/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy11.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy111.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet
true
这篇关于检查文件是否存在于 HDFS 路径中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!