数据流模板,可从GCS读取输入和架构作为运行时参数 [英] Dataflow template that reads input and schema from GCS as runtime arguments

查看:78
本文介绍了数据流模板,可从GCS读取输入和架构作为运行时参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试创建一个使用3个运行时参数的自定义数据流模板.来自gcs和bigquery datasink表的输入文件和架构文件位置.

I am trying to create a custom dataflow template that takes 3 runtime arguments. An input file and schema file location from gcs and bigquery datasink table.

似乎可以使用 beam.io.textio.ReadFromText 方法正确读取输入文件.但是,我需要提供模式文件(而不是通过从gcs读取模式文件来对其进行硬编码.

The input file seems to be read properly using the beam.io.textio.ReadFromText method. However, I need to feed the schema file (instead of hard-coding it inside the template by reading that from gcs as well.

此架构也需要传递给 beam.io.WriteToBigQuery

这是我第一次使用Dataflow,并且正在努力使其正常工作.当将位置作为运行时参数提供时,关于如何将gcs位置读取为字符串的任何想法(在推送数据流模板时,知道运行时参数的get()都会失败).

This is my first time working with Dataflow and I am struggling to make it work. Any ideas on how do I read a gcs location as string when the location is provided as a runtime param (knowing that get() on run time param fails when pushing the Dataflow template).

from __future__ import absolute_import
import logging
import os

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

class TemplateOptions(PipelineOptions):
  """ Class to parse runtime options as required for templating the pipeline """
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
      '--input_file',
      dest='input_file',
      type=str,
      required=False,
      help='Google Storage Bucket location of Input file',
      default=''
    )

    parser.add_value_provider_argument(
      '--input_file_schema',
      dest='input_file_schema',
      type=str,
      required=False,
      help='Google Storage Bucket location of Input file schema',
      default=''
    )

    parser.add_value_provider_argument(
      '--bq_table_name',
      dest='bq_table_name',
      type=str,
      required=False,
      help='Output BQ table to write results to',
      default=''
    )

class ParseLine(beam.DoFn):
  """A helper class which contains the logic to translate the file into a
    format BigQuery will accept."""

  def process(self, string_input):
    from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
    import csv

    schema = parse_table_schema_from_json(self.schema)
    field_map = [f for f in schema.fields]
    items = csv.reader(string_input.split('\n'), delimiter=',')
    for item in items:
      values = [x.decode('utf8') for x in item]
      result = {}
      i = 0
      for value in values:
        result[field_map[i].name] = value
        i += 1
      return result

def run(argv=None):
  """The main function which creates the pipeline and runs it."""
  known_args = PipelineOptions().view_as(TemplateOptions)
  pipeline_options = {
    'project': '<project-id>' ,
    'staging_location': '<gcs>/staging',
    'runner': 'DataflowRunner',
    'temp_location': '<gcs>/temp',
    'template_location': '<gcs>/csv-processor'
  }

  pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
  with beam.Pipeline(options=pipeline_options) as p:
    schemaPCollection = (p 
      | 'Read Schema' >> beam.io.textio.ReadFromText(known_args.input_file_schema)
    )

    (p
      | 'Read Input File From GCS' >> beam.io.textio.ReadFromText(known_args.input_file,
                                                skip_header_lines=1)
 ==>     | 'String to BigQuery Row' >> beam.ParDo(ParseLine(), schemaPCollection) <==
      | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            known_args.bq_table_name,
            schema=<NEED THE SCHEMA AS STRING>,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
    )

    p.run().wait_until_finish()

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

推荐答案

如果架构文件位于GCS中的已知位置,则可以将 ParDo 添加到直接从GCS读取它的管道中.例如,可以在 ParseLine DoFn start_bundle() [1]实现中完成此操作,以便每个捆绑包仅调用一次(不是每个元素).如果您需要抽象出用于存储架构文件的文件系统(而不仅仅是GCS),则可以使用Beam的 FileSystem abstraction [2].

If the schema file is in a known location in GCS, you can add a ParDo to your pipeline that directly reads it from GCS. For example, this can be done in a start_bundle() [1] implementation of your ParseLine DoFn so that it only get invoked once per bundle (not per element). You can use Beam's FileSystem abstraction[2] if you need to abstract out the file-system that you use to store the schema file (not just GCS).

[1] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/core.py#L504 [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filesystems.py

这篇关于数据流模板,可从GCS读取输入和架构作为运行时参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆