Apache Beam:如何同时创建多个经过相同PTransform的PCollection? [英] Apache Beam: How To Simultaneously Create Many PCollections That Undergo Same PTransform?
问题描述
提前谢谢!
[+]问题:
我必须在Google云上保存很多文件,
I have a lot of files on google cloud, for every file I have to:
- 获取文件
- 在每个文件上进行一堆Google-Cloud-Storage API调用以对其进行索引(例如,name = blob.name,size = blob.size)
- 解压缩
- 在这里搜索东西
- 将索引信息+在BigQuery表中的文件内找到的内容
我一直在使用 python2.7 和Google-Cloud-SDK.如果我线性运行,则需要小时.建议我 Apache Beam/DataFlow 进行并行处理.
I've been using python2.7 and the Google-Cloud-SDK. This takes hours if I run it linearly. I was suggested Apache Beam/DataFlow to process in parallel.
[+]我已经能够做到:
我可以从一个文件读取,执行PTransform并写入另一个文件.
I can read from one file, perform a PTransform and write to another file.
def loadMyFile(pipeline, path):
return pipeline | "LOAD" >> beam.io.ReadFromText(path)
def myFilter(request):
return request
with beam.Pipeline(options=PipelineOptions()) as p:
data = loadMyFile(pipeline,path)
output = data | "FILTER" >> beam.Filter(myFilter)
output | "WRITE" >> beam.io.WriteToText(google_cloud_options.staging_location)
[+]我想做什么:
我如何同时加载许多这些文件,对其并行执行相同的转换,然后并行写入大型查询?
How can I load many of those files simultaneously, perform the same transform to them in parallel, then in parallel write to big query?
[+]我已阅读的内容:
https://beam.apache.org/documentation/programming-guide/一个> http://enakai00.hatenablog.com/entry/2016/12/09/104913
再次,非常感谢
推荐答案
好,所以我通过以下操作解决了这个问题:
Ok so I resolved this by doing the following:
1)从某处获取存储桶的名称|第一个PCollection
1) get the name of a bucket from somewhere | first PCollection
2)从该存储桶中获取blob的列表|第二个PCollection
2) get a list of blobs from that bucket | second PCollection
3)执行FlatMap从列表中单独获取blob |第三个PCollection
3) do a FlatMap to get blobs individually from the list | third PCollection
4)做一个获取元数据的ParDo
4) do a ParDo that gets the metadata
5)写入BigQuery
5) write to BigQuery
我的管道如下所示:
with beam.Pipeline(options=options) as pipe:
bucket = pipe | "GetBucketName" >> beam.io.ReadFromText('gs://example_bucket_eraseme/bucketName.txt')
listOfBlobs = bucket | "GetListOfBlobs" >> beam.ParDo(ExtractBlobs())
blob = listOfBlobs | "SplitBlobsIndividually" >> beam.FlatMap(lambda x: x)
dic = blob | "GetMetaData" >> beam.ParDo(ExtractMetadata())
dic | "WriteToBigQuery" >> beam.io.WriteToBigQuery(
这篇关于Apache Beam:如何同时创建多个经过相同PTransform的PCollection?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!