Apache Beam:如何同时创建多个经过相同PTransform的PCollection? [英] Apache Beam: How To Simultaneously Create Many PCollections That Undergo Same PTransform?

查看:88
本文介绍了Apache Beam:如何同时创建多个经过相同PTransform的PCollection?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

提前谢谢!

[+]问题:

我必须在Google云上保存很多文件,

I have a lot of files on google cloud, for every file I have to:

  1. 获取文件
  2. 在每个文件上进行一堆Google-Cloud-Storage API调用以对其进行索引(例如,name = blob.name,size = blob.size)
  3. 解压缩
  4. 在这里搜索东西
  5. 将索引信息+在BigQuery表中的文件内找到的内容

我一直在使用 python2.7 和Google-Cloud-SDK.如果我线性运行,则需要小时.建议我 Apache Beam/DataFlow 进行并行处理.

I've been using python2.7 and the Google-Cloud-SDK. This takes hours if I run it linearly. I was suggested Apache Beam/DataFlow to process in parallel.

[+]我已经能够做到:

我可以从一个文件读取,执行PTransform并写入另一个文件.

I can read from one file, perform a PTransform and write to another file.

def loadMyFile(pipeline, path):
    return pipeline | "LOAD" >> beam.io.ReadFromText(path)

def myFilter(request):
    return request

with beam.Pipeline(options=PipelineOptions()) as p:
    data = loadMyFile(pipeline,path)
    output = data | "FILTER" >> beam.Filter(myFilter)
    output | "WRITE" >> beam.io.WriteToText(google_cloud_options.staging_location)

[+]我想做什么:

我如何同时加载许多这些文件,对其并行执行相同的转换,然后并行写入大型查询?

How can I load many of those files simultaneously, perform the same transform to them in parallel, then in parallel write to big query?

我希望执行的操作图

[+]我已阅读的内容:

https://beam.apache.org/documentation/programming-guide/ http://enakai00.hatenablog.com/entry/2016/12/09/104913

再次,非常感谢

推荐答案

好,所以我通过以下操作解决了这个问题:

Ok so I resolved this by doing the following:

1)从某处获取存储桶的名称|第一个PCollection

1) get the name of a bucket from somewhere | first PCollection

2)从该存储桶中获取blob的列表|第二个PCollection

2) get a list of blobs from that bucket | second PCollection

3)执行FlatMap从列表中单独获取blob |第三个PCollection

3) do a FlatMap to get blobs individually from the list | third PCollection

4)做一个获取元数据的ParDo

4) do a ParDo that gets the metadata

5)写入BigQuery

5) write to BigQuery

我的管道如下所示:

    with beam.Pipeline(options=options) as pipe:
        bucket      = pipe    | "GetBucketName"            >> beam.io.ReadFromText('gs://example_bucket_eraseme/bucketName.txt')
    listOfBlobs = bucket      | "GetListOfBlobs"           >> beam.ParDo(ExtractBlobs())
    blob        = listOfBlobs | "SplitBlobsIndividually"   >> beam.FlatMap(lambda x: x)
    dic         = blob        | "GetMetaData"              >> beam.ParDo(ExtractMetadata())
    dic                       | "WriteToBigQuery"          >> beam.io.WriteToBigQuery(

这篇关于Apache Beam:如何同时创建多个经过相同PTransform的PCollection?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆