Google Cloud 数据流:从具有动态文件名的文件中读取 [英] Google Cloud dataflow : Read from a file with dynamic filename

本文介绍了Google Cloud 数据流:从具有动态文件名的文件中读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 Google Cloud Dataflow 上构建一个可以执行以下操作的管道:

I am trying to build a pipeline on Google Cloud Dataflow that would do the following:

  • 收听 Pubsub 订阅上的事件
  • 从事件文本中提取文件名
  • 读取文件(来自 Google Cloud Storage 存储分区)
  • 将记录存储在 BigQuery 中

代码如下:

Pipeline pipeline = //create pipeline
pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub"))
        .apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>)
        .apply(TextIO.read().from(""))???

我在第三步中挣扎,不太确定如何访问第二步的输出并在第三步中使用它.我曾尝试编写产生以下内容的代码:

I am struggling with 3rd step, not quite sure how to access the output of second step and use it in 3rd. I have tried writing the code that produces the following:

private ParDo.SingleOutput<KV<String, byte[]>, TextIO.Read> readFile(){
    //A class that extends DoFn<KV<String, byte[]>, TextIO.Read> and has TextIO.read wrapped into processElement method
}

但是,我无法在后续步骤中读取文件内容.

However, I am not able to read the file content in subsequent step.

谁能告诉我我需要在第 3 步和第 4 步中写什么,以便我可以逐行使用文件并将输出存储到 BigQuery(或只是记录它).

Could anyone please me know what do I need to write in 3rd and 4th steps so that I can consume the file line by line and store the output to BigQuery (or just log it).

推荐答案

表达阅读的自然方式是使用 TextIO.readAll() 方法,该方法从输入中读取文本文件PCollection 文件名.此方法已在 Beam 代码库中引入,但目前尚未发布.它将包含在 Beam 2.2.0 版本和相应的 Dataflow 2.2.0 版本中.

The natural way to express your read would be by using TextIO.readAll() method, which reads text files from an input PCollection of file names. This method has been introduced within the Beam codebase, but is not currently in a released version. It will be included in the Beam 2.2.0 release and the corresponding Dataflow 2.2.0 release.

这篇关于Google Cloud 数据流:从具有动态文件名的文件中读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆