Google Cloud dataflow:从具有动态文件名的文件读取 [英] Google Cloud dataflow : Read from a file with dynamic filename
问题描述
- 收听Pubsub订阅上的事件
li>
- 从事件文本中提取文件名称
- 阅读文件(来自Google云端存储存储桶) in BigQuery
以下是代码:
Pipeline pipeline = //创建管道
pipeline.apply(read events,PubsubIO.readStrings()。fromSubscription(sub))
.apply(Deserialise events /产生ParDo.SingleOutput的代码< String,KV< String,byte []>>)
.apply(TextIO.read()。from())???
我在第三步挣扎,不太确定如何访问第二步的输出并使用它在第三。我已经尝试编写产生以下代码:
private ParDo.SingleOutput< KV< String,byte []> ;, TextIO.Read> readFile(){
//扩展DoFn }
然而,我无法在后续步骤中阅读文件内容。
任何人都可以请我知道我需要在第三和第四步中写什么,以便我可以逐行消耗文件将输出存储到BigQuery中(或只记录它)。 解决方案
表达您阅读的自然方式是使用 TextIO.readAll()
方法,它从文件名的输入 PCollection
中读取文本文件。该方法已在Beam代码库中引入,但目前尚未发布。它将包含在Beam 2.2.0版本和相应的Dataflow 2.2.0版本中。
I am trying to build a pipeline on Google Cloud Dataflow that would do the following:
- Listen to events on Pubsub subscription
- Extract the filename from event text
- Read the file (from Google Cloud Storage bucket)
- Store the records in BigQuery
Following is the code:
Pipeline pipeline = //create pipeline
pipeline.apply("read events", PubsubIO.readStrings().fromSubscription("sub"))
.apply("Deserialise events", //Code that produces ParDo.SingleOutput<String, KV<String, byte[]>>)
.apply(TextIO.read().from(""))???
I am struggling with 3rd step, not quite sure how to access the output of second step and use it in 3rd. I have tried writing the code that produces the following:
private ParDo.SingleOutput<KV<String, byte[]>, TextIO.Read> readFile(){
//A class that extends DoFn<KV<String, byte[]>, TextIO.Read> and has TextIO.read wrapped into processElement method
}
However, I am not able to read the file content in subsequent step.
Could anyone please me know what do I need to write in 3rd and 4th steps so that I can consume the file line by line and store the output to BigQuery (or just log it).
The natural way to express your read would be by using TextIO.readAll()
method, which reads text files from an input PCollection
of file names. This method has been introduced within the Beam codebase, but is not currently in a released version. It will be included in the Beam 2.2.0 release and the corresponding Dataflow 2.2.0 release.
这篇关于Google Cloud dataflow:从具有动态文件名的文件读取的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!