在beam.DoFn中调用beam.io.WriteToBigQuery [英] Calling beam.io.WriteToBigQuery in a beam.DoFn

查看:99
本文介绍了在beam.DoFn中调用beam.io.WriteToBigQuery的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经创建了带有一些参数的数据流模板.当我将数据写入BigQuery时,我想利用这些参数来确定它应该写入哪个表.我尝试按照以下链接中的建议在ParDo中调用WriteToBigQuery.

I've created a dataflow template with some parameters. When I write the data to BigQuery, I would like to make use of these parameters to determine which table it is supposed to write to. I've tried calling WriteToBigQuery in a ParDo as suggested in the following link.

如何使用Apache Beam中的运行时值提供程序写入Big Query?

管道成功运行,但未创建数据或将数据加载到BigQuery.知道可能是什么问题吗?

The pipeline ran successfully but it is not creating or loading data to BigQuery. Any idea what might be the issue?

def run():
  pipeline_options = PipelineOptions()
  pipeline_options.view_as(DebugOptions).experiments = ['use_beam_bq_sink']

  with beam.Pipeline(options=pipeline_options) as p:
    custom_options = pipeline_options.view_as(CustomOptions)

    _ = (
      p
      | beam.Create([None])
      | 'Year to periods' >> beam.ParDo(SplitYearToPeriod(custom_options.year))
      | 'Read plan data' >> beam.ParDo(GetPlanDataByPeriod(custom_options.secret_name))
      | 'Transform record' >> beam.Map(transform_record)
      | 'Write to BQ' >> beam.ParDo(WritePlanDataToBigQuery(custom_options.year))
    )

if __name__ == '__main__':
  run()

class CustomOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument('--year', type=int)
    parser.add_value_provider_argument('--secret_name', type=str)

class WritePlanDataToBigQuery(beam.DoFn):
  def __init__(self, year_vp):
    self._year_vp = year_vp

  def process(self, element):
    year = self._year_vp.get()

    table = f's4c.plan_data_{year}'
    schema = {
      'fields': [ ...some fields properties ]
    }

    beam.io.WriteToBigQuery(
      table=table,
      schema=schema,
      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=BigQueryDisposition.WRITE_TRUNCATE,
      method=beam.io.WriteToBigQuery.Method.FILE_LOADS
    )

推荐答案

您已在 process 方法的内部实例化了PTransform beam.io.gcp.bigquery.WriteToBigQuery 您的 DoFn .这里有几个问题:

You have instantiated the PTransform beam.io.gcp.bigquery.WriteToBigQuery inside the process method of your DoFn. There are a couple of problems here:

  • 为输入PCollection的每个元素调用 process 方法.它不用于构建管道图.这种动态构造图的方法将行不通.
  • 将其从 DoFn 中移出后,需要将PTransform beam.io.gcp.bigquery.WriteToBigQuery 应用于PCollection,以使其生效.请参见Beam pydoc Beam教程文档
  • The process method is called for each element of the input PCollection. It is not used for building the pipeline graph. This approach to dynamically constructing the graph will not work.
  • Once you move it out of the DoFn, you need to apply the PTransform beam.io.gcp.bigquery.WriteToBigQuery to a PCollection for it to have any effect. See the Beam pydoc or the Beam tutorial documentation.

要为表名创建派生值提供程序,您将需要嵌套"值提供程序.不幸的是,这是 Python SDK不支持.不过,您可以直接使用值提供程序选项.

To create a derived value provider for your table name, you would need a "nested" value provider. Unfortunately this is not supported for the Python SDK. You can use the value provider option directly, though.

作为高级选项,您可能有兴趣尝试"flex模板",该模板实际上将整个程序打包为docker映像并使用参数执行.

As an advanced option, you may be interested in trying out "flex templates" which essentially package up your whole program as a docker image and execute it with parameters.

这篇关于在beam.DoFn中调用beam.io.WriteToBigQuery的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆