从HDFS检索数据时如何获取文件元数据? [英] How to get files metadata, when retrieving data from HDFS?

查看:352
本文介绍了从HDFS检索数据时如何获取文件元数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从HDFS请求数据,我想获取读取文件的元数据. 这将使我能够基于给定时刻的可用数据来构建报告.

I requested data from HDFS, and I would like to get the metadata of the files from which they were read. This will allow me to build reports which will look like based on available data at the given moment.

我找到了使用org.apache.hadoop.fs.FileSystem获取所有文件列表的解决方案. 我知道分区规则,并且可以根据收到的清单构建映射row -> meta.

I found the solution which is to use org.apache.hadoop.fs.FileSystem to get a listing of all files. I know the partitioning rule, and I can build mapping row -> meta, based on the received listing.

但是该决定似乎难以实施和支持.也许有更简单的方法来达到相同的结果?

But this decision seems difficult to implement and support. Maybe there are simpler ways to achieve the same result?

推荐答案

最简单的方法是使用spark udf input_file_name.

Easiest way to do so is with spark udf input_file_name.

import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name()).repartition($"input_file_name")

def getMetadata(rdd: Iterator[Row]) = {
    val map = Map[String, Long]()
    val fs = FileSystem.get(new Configuration())
    rdd.map(row => {
                val path = row.getString(row.size -1)
                if(! map.contains(path)){
                    map.put(path,fs.listStatus(new Path(path))(0).getModificationTime())
                }
                Row.fromSeq(row.toSeq ++ Array[Any](map(path)))
            })
}

spark.createDataFrame(df.rdd.mapPartitions(getMetadata),df.schema.add("modified_ts", LongType)).show(10000,false)

此处modified_ts是文件的mtime.

根据数据的大小,您也可以使用join进行操作.逻辑将类似于:

Depending on size of the data, you can also do it with join. The logic will look something like:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions._

val mtime =(path:String)=> FileSystem.get(new Configuration()).listStatus(new Path(path)).head.getModificationTime
val mtimeUDF = udf(mtime)

val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name())

val metadata_df = df.select($"input_file_name").distinct().withColumn("mtime", mtimeUDF($"input_file_name"))

val rows_with_metadata = df.join(metadata_df , "input_file_name")
rows_with_metadata.show(false)

这篇关于从HDFS检索数据时如何获取文件元数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆