如何通过 Apache Beam/Google Cloud DataFlow 中的多个 ParDo 转换处理对本地文件的操作 [英] How to handle operations on local files over multiple ParDo transforms in Apache Beam / Google Cloud DataFlow

查看:19
本文介绍了如何通过 Apache Beam/Google Cloud DataFlow 中的多个 ParDo 转换处理对本地文件的操作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在为 Google Cloud Dataflow 开发一个 ETL 管道,其中我有几个分支 ParDo 转换,每个转换都需要一个本地音频文件.然后将分支结果合并并导出为文本.

I am developing an ETL pipeline for Google Cloud Dataflow where I have several branching ParDo transforms which each require a local audio file. The branched results are then combined and exported as text.

这最初是一个在单台机器上运行的 Python 脚本,我正尝试使用 GC 数据流来适应 VM 工作线程的并行化.

This was initially a Python script that ran on a single machine that I am attempting to adapt for VM worker parallelisation using GC Dataflow.

提取过程从单个 GCS 存储桶位置下载文件,然后在转换完成后将其删除,以控制存储.这是由于预处理模块需要对文件进行本地访问.这可以通过自己重写一些预处理库来重新设计以处理字节流而不是文件 - 但是,这方面的一些尝试并不顺利,我想首先探索如何处理并行化本地Apache Beam/GC Dataflow 中的文件操作,以便更好地理解框架.

The extraction process downloads the files from a single GCS bucket location then deletes them after the transform is completed to keep storage under control. This is due to the pre-processing module which requires local access to the files. This could be re-engineered to handle a byte stream instead of a file by rewriting some of the pre-processing libraries myself - however, some attempts at this aren't going well and I'd like to explore first how to handle parallelised local file operations in Apache Beam / GC Dataflow in order to understand the framework better.

在这个粗略的实现中,每个分支下载和删除文件,有很多双重处理.在我的实现中,我有 8 个分支,因此每个文件都被下载和删除 8 次.是否可以将 GCS 存储桶安装在每个 worker 上而不是从远程下载文件?

In this rough implementation each branch downloads and deletes the files, with lots of double handling. In my implementation I have 8 branches, so each file is being downloaded and deleted 8 times. Could a GCS bucket instead be mounted on every worker rather than downloading files from the remote?

或者是否有另一种方法来确保工作人员被传递对文件的正确引用,以便:

Or is there another way to ensure workers are being passed the correct reference to a file so that:

  • 单个DownloadFilesDoFn()可以批量下载
  • 然后将PCollection中的本地文件引用散布到所有分支
  • 然后最后的 CleanUpFilesDoFn() 可以删除它们
  • 如何并行化本地文件引用?
  • a single DownloadFilesDoFn() can download a batch
  • then fan out the local file references in PCollection to all the branches
  • and then a final CleanUpFilesDoFn() can remove them
  • How can you parallelise local file references?

如果无法避免本地文件操作,Apache Beam/GC Dataflow 的最佳分支ParDo 策略是什么?

What is the best branched ParDo strategy for Apache Beam / GC Dataflow if local file operations cannot be avoided?

为简单起见,我现有实现的一些示例代码有两个分支.

Some example code of my existing implementation with two branches for simplicity.

# singleton decorator
def singleton(cls):
  instances = {}
  def getinstance():
      if cls not in instances:
          instances[cls] = cls()
      return instances[cls]
  return getinstance

@singleton
class Predict():
  def __init__(self, model):
    '''
    Process audio, reads in filename 
    Returns Prediction
    '''
    self.model = model

  def process(self, filename):
      #simplified pseudocode
      audio = preprocess.load(filename=filename)
      prediction = inference(self.model, audio)
      return prediction

class PredictDoFn(beam.DoFn):
  def __init__(self, model):
    self.localfile, self.model = "", model
    
  def process(self, element):
    # Construct Predict() object singleton per worker
    predict = Predict(self.model)

    subprocess.run(['gsutil','cp',element['GCSPath'],'./'], cwd=cwd, shell=False)
    self.localfile = cwd + "/" + element['GCSPath'].split('/')[-1]

    res = predict.process(self.localfile)
    return [{
        'Index': element['Index'], 
        'Title': element['Title'],
        'File' : element['GCSPath'],
        self.model + 'Prediction': res
        }]    
  def finish_bundle(self):
    subprocess.run(['rm',self.localfile], cwd=cwd, shell=False)


# DoFn to split csv into elements (GSC bucket could be read as a PCollection instead maybe)
class Split(beam.DoFn):
    def process(self, element):
        Index,Title,GCSPath = element.split(",")
        GCSPath = 'gs://mybucket/'+ GCSPath
        return [{
            'Index': int(Index),
            'Title': Title,
            'GCSPath': GCSPath
        }]

管道的简化版本:

with beam.Pipeline(argv=pipeline_args) as p:
    files = 
        ( 
        p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input)
          | 'Parse CSV into Dict' >> beam.ParDo(Split())
        )
    # prediction 1 branch
    preds1 = 
        (
          files | 'Prediction 1' >> beam.ParDo(PredictDoFn(model1))
        )
    # prediction 2 branch
    preds2 = 
        (
          files | 'Prediction 2' >> beam.ParDo(PredictDoFn(model2))
        )
    
    # join branches
    joined = { preds1, preds2 }

    # output to file
    output = 
        ( 
      joined | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
        )

推荐答案

为了避免重复下载文件,可以将文件内容放入pCollection中.

In order to avoid downloading the files repeatedly, the contents of the files can be put into the pCollection.

class DownloadFilesDoFn(beam.DoFn):
  def __init__(self):
     import re
     self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')

  def start_bundle(self):
     import google.cloud.storage
     self.gcs = google.cloud.storage.Client()

  def process(self, element):
     file_match = self.gcs_path_regex.match(element['GCSPath'])
     bucket = self.gcs.get_bucket(file_match.group(1))
     blob = bucket.get_blob(file_match.group(2))
     element['file_contents'] = blob.download_as_bytes()
     yield element
     

然后 PredictDoFn 变为:

Then PredictDoFn becomes:

class PredictDoFn(beam.DoFn):
  def __init__(self, model):
    self.model = model

  def start_bundle(self):
    self.predict = Predict(self.model)
    
  def process(self, element):
    res = self.predict.process(element['file_contents'])
    return [{
        'Index': element['Index'], 
        'Title': element['Title'],
        'File' : element['GCSPath'],
        self.model + 'Prediction': res
        }]   

和管道:

with beam.Pipeline(argv=pipeline_args) as p:
    files = 
        ( 
        p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input)
          | 'Parse CSV into Dict' >> beam.ParDo(Split())
          | 'Read files' >> beam.ParDo(DownloadFilesDoFn())
        )
    # prediction 1 branch
    preds1 = 
        (
          files | 'Prediction 1' >> beam.ParDo(PredictDoFn(model1))
        )
    # prediction 2 branch
    preds2 = 
        (
          files | 'Prediction 2' >> beam.ParDo(PredictDoFn(model2))
        )
    
    # join branches
    joined = { preds1, preds2 }

    # output to file
    output = 
        ( 
      joined | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
        )

这篇关于如何通过 Apache Beam/Google Cloud DataFlow 中的多个 ParDo 转换处理对本地文件的操作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆