在 beam.DoFn 中调用 beam.io.WriteToBigQuery [英] Calling beam.io.WriteToBigQuery in a beam.DoFn

查看:28
本文介绍了在 beam.DoFn 中调用 beam.io.WriteToBigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个带有一些参数的数据流模板.当我将数据写入 BigQuery 时,我想利用这些参数来确定它应该写入哪个表.我已尝试按照以下链接中的建议在 ParDo 中调用 WriteToBigQuery.

I've created a dataflow template with some parameters. When I write the data to BigQuery, I would like to make use of these parameters to determine which table it is supposed to write to. I've tried calling WriteToBigQuery in a ParDo as suggested in the following link.

如何使用 Apache Beam 中的运行时值提供程序写入 Big Query?

管道成功运行,但未创建或加载数据到 BigQuery.知道可能是什么问题吗?

The pipeline ran successfully but it is not creating or loading data to BigQuery. Any idea what might be the issue?

def run():
  pipeline_options = PipelineOptions()
  pipeline_options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']

  with beam.Pipeline(options=pipeline_options) as p:
    custom_options = pipeline_options.view_as(CustomOptions)

    _ = (
      p
      | beam.Create([None])
      | 'Year to periods' >> beam.ParDo(SplitYearToPeriod(custom_options.year))
      | 'Read plan data' >> beam.ParDo(GetPlanDataByPeriod(custom_options.secret_name))
      | 'Transform record' >> beam.Map(transform_record)
      | 'Write to BQ' >> beam.ParDo(WritePlanDataToBigQuery(custom_options.year))
    )

if __name__ == '__main__':
  run()

class CustomOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--year', type=int)
    parser.add_value_provider_argument('--secret_name', type=str)

class WritePlanDataToBigQuery(beam.DoFn):
  def __init__(self, year_vp):
    self._year_vp = year_vp

  def process(self, element):
    year = self._year_vp.get()

    table = f's4c.plan_data_{year}'
    schema = {
      'fields': [ ...some fields properties ]
    }

    beam.io.WriteToBigQuery(
      table=table,
      schema=schema,
      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
      method=beam.io.WriteToBigQuery.Method.FILE_LOADS
    )

推荐答案

您已经在 process 方法中实例化了 PTransform beam.io.gcp.bigquery.WriteToBigQuery您的 DoFn.这里有几个问题:

You have instantiated the PTransform beam.io.gcp.bigquery.WriteToBigQuery inside the process method of your DoFn. There are a couple of problems here:

  • 为输入 PCollection 的每个元素调用 process 方法.它不用于构建管道图.这种动态构建图表的方法行不通.
  • 将其移出 DoFn 后,您需要将 PTransform beam.io.gcp.bigquery.WriteToBigQuery 应用于 PCollection 以使其产生任何效果.请参阅 Beam pydocBeam 教程文档.
  • The process method is called for each element of the input PCollection. It is not used for building the pipeline graph. This approach to dynamically constructing the graph will not work.
  • Once you move it out of the DoFn, you need to apply the PTransform beam.io.gcp.bigquery.WriteToBigQuery to a PCollection for it to have any effect. See the Beam pydoc or the Beam tutorial documentation.

要为您的表名创建派生值提供程序,您需要一个嵌套"值提供程序.不幸的是,这Python SDK 不支持.不过,您可以直接使用值提供程序选项.

To create a derived value provider for your table name, you would need a "nested" value provider. Unfortunately this is not supported for the Python SDK. You can use the value provider option directly, though.

作为一个高级选项,您可能有兴趣尝试flex 模板",它基本上将您的整个程序打包为一个 docker 映像并使用参数执行它.

As an advanced option, you may be interested in trying out "flex templates" which essentially package up your whole program as a docker image and execute it with parameters.

这篇关于在 beam.DoFn 中调用 beam.io.WriteToBigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆