从 HDFS 检索数据时如何获取文件元数据? [英] How to get files metadata, when retrieving data from HDFS?

查看:28
本文介绍了从 HDFS 检索数据时如何获取文件元数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从 HDFS 请求数据,我想获取从中读取它们的文件的元数据.这将使我能够根据给定时刻的可用数据构建看起来像的报告.

I requested data from HDFS, and I would like to get the metadata of the files from which they were read. This will allow me to build reports which will look like based on available data at the given moment.

我找到了使用 org.apache.hadoop.fs.FileSystem 获取所有文件列表的解决方案.我知道分区规则,我可以建立映射 row ->元,基于收到的列表.

I found the solution which is to use org.apache.hadoop.fs.FileSystem to get a listing of all files. I know the partitioning rule, and I can build mapping row -> meta, based on the received listing.

但这个决定似乎难以实施和支持.也许有更简单的方法可以达到相同的结果?

But this decision seems difficult to implement and support. Maybe there are simpler ways to achieve the same result?

推荐答案

最简单的方法是使用 spark udf input_file_name.

Easiest way to do so is with spark udf input_file_name.

import scala.collection.mutable.Map
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name()).repartition($"input_file_name")

def getMetadata(rdd: Iterator[Row]) = {
    val map = Map[String, Long]()
    val fs = FileSystem.get(new Configuration())
    rdd.map(row => {
                val path = row.getString(row.size -1)
                if(! map.contains(path)){
                    map.put(path,fs.listStatus(new Path(path))(0).getModificationTime())
                }
                Row.fromSeq(row.toSeq ++ Array[Any](map(path)))
            })
}

spark.createDataFrame(df.rdd.mapPartitions(getMetadata),df.schema.add("modified_ts", LongType)).show(10000,false)

这里的modified_ts 是文件的mtime.

根据数据的大小,您也可以使用 join 来完成.逻辑看起来像:

Depending on size of the data, you can also do it with join. The logic will look something like:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.functions._

val mtime =(path:String)=> FileSystem.get(new Configuration()).listStatus(new Path(path)).head.getModificationTime
val mtimeUDF = udf(mtime)

val df = spark.read.text("<path>").withColumn("input_file_name", input_file_name())

val metadata_df = df.select($"input_file_name").distinct().withColumn("mtime", mtimeUDF($"input_file_name"))

val rows_with_metadata = df.join(metadata_df , "input_file_name")
rows_with_metadata.show(false)

这篇关于从 HDFS 检索数据时如何获取文件元数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆