检查文件是否存在于 HDFS 路径中? [英] Check if file exists in HDFS path?

查看:87
本文介绍了检查文件是否存在于 HDFS 路径中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何在给定某个基本路径的情况下检查文件是否存在.我正在为该方法提供文件列表,例如:file1.snappy, file2,snappy,...

How can I check if a file exists given a certain base path. I am providing the method a file list for example: file1.snappy, file2,snappy,...

我需要检查文件是否存在于任一给定路径中,例如:hdfs://a/b/c/source/file1.snappy 或文件是否存在于 hdfs://a/b/c/target/file1.snappy.如何更新/修改下面的方法以接受 /a/b/c/target//a/b/c/source/ 作为基本路径并检查是否文件已存在?如果在源中存在,则添加到源列表中,如果在目标中,则添加到目标列表中.

I need to check if file exists in either of the given paths for example: hdfs://a/b/c/source/file1.snappy or if file exists in hdfs://a/b/c/target/file1.snappy. How can I update/modify the method below to accept /a/b/c/target/ or /a/b/c/source/ as a basepath and check if file exists? If it exists in source add to a sourceList and if it is in destination add to a destination list.

  val fs = FileSystem.get(sprk.sparkContext.hadoopConfiguration)

  def fileExists(fileList:Array[String]) : Boolean = {
    var fileNotFound = 0
    fileList.foreach{
      file => {
        if(!fs.exists(new Path(file)))  fileNotFound+=1
        print("fileList",file)
      }
    }
    if(fileNotFound > 0) {
      println(fileNotFound + ": number of files not found probably moved")
      false
    }
    else
      true
  }

PS:亲爱的读者,如果这有帮助,请为我的问题点赞.它会帮助我:)

PS: Dear readers please upvote my question if this helped. It would help me :)

推荐答案

我有一个源目录和目标如下例

I have a source dir and target are like this below example

试试这种方式进行递归查找

try this way for recursive lookup

URI.create(... ) 在处理 s3 对象时非常有用(也适用于 hdfs/本地 fs)

URI.create(... ) is very imp when you are dealing with s3 objects (will also works with hdfs / local fs)

import java.net.URI

import org.apache.hadoop.fs.{FileSystem, LocatedFileStatus, Path, RemoteIterator}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
/**
    * getAllFiles - get all files recursively from the sub folders.
    * 
    * @param path String
    * @param sc SparkContext
    * @return Seq[String]
    */
  def getAllFiles(path: String, sc: SparkContext): Seq[String] = {
    val conf = sc.hadoopConfiguration
    val fs = FileSystem.get(URI.create(path), conf)
    val files: RemoteIterator[LocatedFileStatus] = fs.listFiles(new Path(path), true) // true for recursive lookup
    val buf = new ArrayBuffer[String]
    while (files.hasNext()) {
      val fileStatus = files.next();
      buf.append(fileStatus.getPath().toString)
    }
    buf.toSeq
  }

示例用法:

 val spark: SparkSession = SparkSession.builder.appName(getClass.getName)
    .master("local[*]").getOrCreate


  val sc = spark.sparkContext

  val myfiles: Seq[String] = getAllFiles("data/test_table", sc)
  myfiles.foreach(println)
  println(myfiles.contains("/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet"))

结果:

/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy.parquet
/data/test_table/target/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1111.parquet
/data/test_table/target/part-00000-9205704a-cb0c-4933-87d4-c21313e76297-c000.snappy11.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy1.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy111.parquet
/data/test_table/source/part-00000-19b67f0c-3fb0-4718-8a31-ac770e2dc0ba-c000.snappy.parquet


true

这篇关于检查文件是否存在于 HDFS 路径中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆