Google Cloud dataflow:从具有动态文件名的文件读取 [英] Google Cloud dataflow : Read from a file with dynamic filename

本文介绍了Google Cloud dataflow:从具有动态文件名的文件读取的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述




  • 收听Pubsub订阅上的事件

    li>

  • 从事件文本中提取文件名称

  • 阅读文件(来自Google云端存储存储桶) in BigQuery



以下是代码:

  Pipeline pipeline = //创建管道
pipeline.apply(read events,PubsubIO.readStrings()。fromSubscription(sub))
.apply(Deserialise events /产生ParDo.SingleOutput的代码< String,KV< String,byte []>>)
.apply(TextIO.read()。from())???

我在第三步挣扎,不太确定如何访问第二步的输出并使用它在第三。我已经尝试编写产生以下代码:

  private ParDo.SingleOutput< KV< String,byte []> ;, TextIO.Read> readFile(){
//扩展DoFn }

然而,我无法在后续步骤中阅读文件内容。

任何人都可以请我知道我需要在第三和第四步中写什么,以便我可以逐行消耗文件将输出存储到BigQuery中(或只记录它)。 解决方案

表达您阅读的自然方式是使用 TextIO.readAll()方法,它从文件名的输入 PCollection 中读取文本文件。该方法已在Beam代码库中引入,但目前尚未发布。它将包含在Beam 2.2.0版本和相应的Dataflow 2.2.0版本中。


I am trying to build a pipeline on Google Cloud Dataflow that would do the following:

  • Listen to events on Pubsub subscription
  • Extract the filename from event text
  • Read the file (from Google Cloud Storage bucket)
  • Store the records in BigQuery

Following is the code:

Pipeline pipeline = //create pipeline
pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub"))
        .apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>)
        .apply(TextIO.read().from(""))???

I am struggling with 3rd step, not quite sure how to access the output of second step and use it in 3rd. I have tried writing the code that produces the following:

private ParDo.SingleOutput<KV<String, byte[]>, TextIO.Read> readFile(){
    //A class that extends DoFn<KV<String, byte[]>, TextIO.Read> and has TextIO.read wrapped into processElement method
}

However, I am not able to read the file content in subsequent step.

Could anyone please me know what do I need to write in 3rd and 4th steps so that I can consume the file line by line and store the output to BigQuery (or just log it).

解决方案

The natural way to express your read would be by using TextIO.readAll() method, which reads text files from an input PCollection of file names. This method has been introduced within the Beam codebase, but is not currently in a released version. It will be included in the Beam 2.2.0 release and the corresponding Dataflow 2.2.0 release.

这篇关于Google Cloud dataflow:从具有动态文件名的文件读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆