访问文件名&中的信息(元数据)在Beam管道中输入 [英] Accessing information (Metadata) in the file name & type in a Beam pipeline

查看:57
本文介绍了访问文件名&中的信息(元数据)在Beam管道中输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的文件名包含我在管道中所需的信息,例如,数据点的标识符是文件名的一部分,而不是数据中的字段.例如,每台风力涡轮机都会生成一个文件turbo-loc-001-007.csv.例如,我需要管道中的loc数据.

My filename contains information that I need in my pipeline, for example the identifier for my data points is part of the filename and not a field in the data. e.g Every wind turbine generates a file turbine-loc-001-007.csv. e.g And I need the loc data within the pipeline.

推荐答案

Java(sdk 2.9.0):

Java (sdk 2.9.0):

Beams TextIO读取器无法访问文件名本身,在这些用例中,我们需要使用FileIO来匹配文件并访问文件名中存储的信息.与TextIO不同,在FileIO读取的下游转换中,用户需要注意文件的读取. FileIO读取的结果是PCollection,ReadableFile类包含作为元数据的文件名,可以与文件的内容一起使用.

Beams TextIO readers do not give access to the filename itself, for these use cases we need to make use of FileIO to match the files and gain access to the information stored in the file name. Unlike TextIO, the reading of the file needs to be taken care of by the user in transforms downstream of the FileIO read. The results of a FileIO read is a PCollection the ReadableFile class contains the file name as metadata which can be used along with the contents of the file.

FileIO确实具有方便的方法readFullyAsUTF8String(),该方法会将整个文件读入String对象,这将首先将整个文件读入内存.如果需要考虑内存,则可以使用诸如FileSystems之类的实用程序类直接处理文件.

FileIO does have a convenience method readFullyAsUTF8String() which will read the entire file into a String object, this will read the whole file into memory first. If memory is a concern you can work directly with the file with utility classes like FileSystems.

来自:文档链接

PCollection<KV<String, String>> filesAndContents = p
     .apply(FileIO.match().filepattern("hdfs://path/to/*.gz"))
     // withCompression can be omitted - by default compression is detected from the filename.
     .apply(FileIO.readMatches().withCompression(GZIP))
     .apply(MapElements
         // uses imports from TypeDescriptors
         .into(KVs(strings(), strings()))
         .via((ReadableFile f) -> KV.of(
             f.getMetadata().resourceId().toString(), f.readFullyAsUTF8String())));

Python(sdk 2.9.0):

Python (sdk 2.9.0):

对于2.9.0 for python,您将需要从Dataflow管道外部收集URI列表,并将其作为参数输入到管道中.例如,利用FileSystems通过Glob模式读取文件列表,然后将其传递给PCollection进行处理.

For 2.9.0 for python you will need to collect the list of URI from outside of the Dataflow pipeline and feed it in as a parameter to the pipeline. For example making use of FileSystems to read in the list of files via a Glob pattern and then passing that to a PCollection for processing.

一次查看文件PR https://github.com/apache/beam/pull/7791/可用,以下代码也可以作为python的选项.

Once fileio see PR https://github.com/apache/beam/pull/7791/ is available, the following code would also be an option for python.

import apache_beam as beam
from apache_beam.io import fileio

with beam.Pipeline() as p:
  readable_files = (p 
                    | fileio.MatchFiles(‘hdfs://path/to/*.txt’)
                    | fileio.ReadMatches()
                    | beam.Reshuffle())
  files_and_contents = (readable_files 
                        | beam.Map(lambda x: (x.metadata.path, 
                                              x.read_utf8()))

这篇关于访问文件名&amp;中的信息(元数据)在Beam管道中输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆