Apache Beam TextIO.ReadAll 如何发出 KeyValue 而不是 Pcollection 的字符串 [英] Apache Beam TextIO.ReadAll How to emit KeyValue instead of String of Pcollection

查看:26
本文介绍了Apache Beam TextIO.ReadAll 如何发出 KeyValue 而不是 Pcollection 的字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

管道从 PUBSUBIo 读取开始.PubSub IO 中的消息是一个 GCS 文件路径.我知道我可以使用 ReadAll() 从每个路径发出行.但是,它不符合我的目的(有关文件路径的信息丢失).我需要的是发出一个 KV<'Filepath','Lines inside files'>.

Pipeline Starts by Reading from PUBSUBIo. The message inside PubSub IO is a GCS file path. I know that I can use ReadAll() to emit the lines from each path. However, it doesn't serve my purpose(Information regarding the file path is lost). What I need is to emit is a KV<'Filepath','Lines inside files'>.

PubSUB 消息看起来像

PubSUB messages will look like

Message1 -> gs://folder1/Topic1/topicfile1.gz
Message2 -> gs://folder1/Topic2/topicfile2.gz

假设文件内容如下

topicfile1.gz
{
topic1.line1
topic1.line2
}

topicfile2.gz
{
topic2.line1
topic2.line2
}

我期待的是像下面这样的 pcollection

What I am expecting is a pcollection like the one below

{KV<'gs://folder1/Topic1/topicfile1.gz','topic1.line1'>}
{KV<'gs://folder1/Topic1/topicfile1.gz','topic1.line2'>}
{KV<'gs://folder1/Topic2/topicfile2.gz','topic2.line1'>}
{KV<'gs://folder1/Topic2/topicfile2.gz','topic2.line2'>}

我找不到从 ParDo 函数内的路径读取文件以将路径映射到行的方法.

I could't find a way to read a file from a path inside the ParDo function to map the path to the lines.

希望这很清楚.

推荐答案

如果我正确理解了问题,我认为 TextIO 不支持开箱即用.

I don't think this is supported in TextIO out of the box if I understood the question correctly.

详情

当你应用像 readAll() 这样的转换时,在从 IO 获取初始文件路径和最后从所有文件发出所有行之间涉及几个步骤.

When you apply transforms like readAll() there are a couple of steps involved between getting the initial file paths from the IO and emitting all the lines from all the files in the end.

比如逻辑TextIO 中:

For example, the logic in TextIO:

  • 它接受文件路径(或路径模式)的PCollection
  • 它应用了FileIO.matchAll(),将路径模式的PCollection转换为MatchResult.MetadataPCollection> 描述这些路径的对象;
  • 然后它应用 FileIO.readMatches() 将元数据对象转换为描述特定文件的 ReadableFile 对象;
  • 最后它应用 TextIO.readFiles() 接收一个 ReadableFile 并输出该文件中的所有字符串;
    • 在这最后一步中,您需要向输出添加一个文件路径,以便您知道哪个字符串来自哪个文件.如果有一个选项可以将最后一步更改为发出 KV 而不仅仅是字符串,以便您可以使用 ReadableFile.metadata.
    • it accepts a PCollection of file paths (or path patterns);
    • it applies FileIO.matchAll() that converts the PCollection of path patterns into PCollection of MatchResult.Metadata objects that describe those paths;
    • then it applies the FileIO.readMatches() that converts the metadata objects into ReadableFile objects that describe specific files;
    • and lastly it applies TextIO.readFiles() that takes in a ReadableFile and outputs all the strings from that file;
      • at this last step you would want to add a file path to the output, so that you know which string comes from which file. What would help if there was an option to change the last step to emit KV<ReadableFile, String> instead of just strings, so that you could access the file path using ReadableFile.metadata.

      环顾该代码,似乎从文件中发出原始行是目前使用 TextIO 唯一支持的处理方式.

      Looking around that code it seems that emitting the raw lines from the files is the only supported way of doing things using TextIO right now.

      解决方法

      可能最直接的方法是编写自己的 PTransform 类似于 TextIO.ReadAll.这会像这样工作:

      Probably the most straightforward way is to write your own PTransform similar to TextIO.ReadAll. This would work something like this:

      高级:

      • 创建和自定义您自己的版本TextIO.ReadAll;
      • ReadAllViaFileBasedSource;
      • 更改您的 ReadAllViaFileBasedSource 版本以发出您想要的内容;
      • 使用这个自定义版本的 TextIO.ReadAll 使用你的自定义版本 ReadAllViaFileBasedSource 发出正确的东西;
      • Create and customize your own version TextIO.ReadAll;
      • And of ReadAllViaFileBasedSource;
      • Change your version of ReadAllViaFileBasedSource to emit what you want;
      • Use this custom version of TextIO.ReadAll that uses your custom version ReadAllViaFileBasedSource that emits the correct things;

      稍微详细一点:

      • 只需复制整个TextIO.ReadAll,它是 FileIO 我上面提到的步骤;
      • 但在expand()中,在最后一步,您将应用自定义逻辑而不是 readFiles()这将发出您想要的 KV:
        • just copy the whole TextIO.ReadAll, it's a pretty short wrapper for FileIO that implements the steps I mentioned above;
        • but in the expand(), at the last step, instead of readFiles() you would apply a custom logic that will emit your desired KVs:
          • readFiles() right now is implemented by ReadAllViaFileBasedSource;
          • ReadAllViaFileBasedSource seems to be the actual thing that converts ReadableFiles into strings;
          • you would create a copy of ReadAllViaFileBasedSource and change the output logic, so that instead of just emitting the file it also emits the metadata;

          这篇关于Apache Beam TextIO.ReadAll 如何发出 KeyValue 而不是 Pcollection 的字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆