如何使用 Apache Beam 中的运行时值提供程序写入 Big Query? [英] How can I write to Big Query using a runtime value provider in Apache Beam?

查看:26
本文介绍了如何使用 Apache Beam 中的运行时值提供程序写入 Big Query?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 beam.io.WriteToBigQuery 并打开接收器实验选项使其工作.我实际上打开了它,但我的问题是我试图从 str() 中包裹的两个变量(数据集 + 表)构建"完整的表引用.这是将整个值提供程序参数数据作为字符串而不是调用 get() 方法来获取值.

I got this to work using beam.io.WriteToBigQuery with the sink experimental option turned on. I actually had it on but my issue was I was trying to "build" the full table reference from two variables (dataset + table) wrapped in str(). This was taking the whole value provider arguments data as a string instead of calling the get() method to get just the value.

我正在尝试生成一个数据流模板,然后从 GCP 云函数调用.(作为参考,我的数据流作业应该读取一个包含一堆文件名的文件,然后从 GCS 读取所有这些并写入到 BQ).因此,我需要以这样一种方式编写它,以便我可以使用运行时值提供程序来传递 BigQuery 数据集/表.

I am trying to generate a Dataflow template to then call from a GCP Cloud Function.(For reference, my dataflow job is supposed to read a file with a bunch of filenames in it and then reads all of those from GCS and writes the to BQ). Because of this I need to write it in such a way so that I can use runtime value providers to pass the BigQuery dataset/table.

在我的帖子底部是我目前的代码,省略了一些与问题无关的内容.特别注意 BQ_flexible_writer(beam.DoFn) - 这就是我试图定制"beam.io.WriteToBigQuery 以便它接受运行时值提供程序的地方.

At the bottom of my post is my code currently, omitting some stuff that's not relevant to the question from it. Pay attention to the BQ_flexible_writer(beam.DoFn) specifically - that's where I am trying to "customise" beam.io.WriteToBigQuery so that it accepts the runtime value providers.

我的模板生成良好,当我在不提供运行时变量(依赖于默认值)的情况下测试运行管道时,它成功了,我在控制台中查看作业时看到添加的行.但是,在检查 BigQuery 时没有数据(tripple 检查日志中的数据集/表名称是否正确).不确定它去哪里或我可以添加什么日志来了解元素发生了什么?

My template generates fine and when I test run the pipeline without supplying runtime variables (relying on the defaults) it succeeds and I see the rows added when looking at the job in the console. However, when checking BigQuery there's no data (tripple checked the dataset/table name is correct in the logs). Not sure where it goes or what logging I can add to understand what's happening to the elements?

知道这里发生了什么吗?或者关于如何使用运行时变量写入 BigQuery 的建议?我什至可以像我将它包含在我的 DoFn 中的方式调用 beam.io.WriteToBigQuery 还是我必须采用 beam.io.WriteToBigQuery 背后的实际代码并使用它?

Any ideas what's happening here? Or suggestions on how I can write to BigQuery using runtime variables? Can I even call beam.io.WriteToBigQuery the way I've included it in my DoFn or do I have to take the actual code behind beam.io.WriteToBigQuery and work with that?

#=========================================================

class BQ_flexible_writer(beam.DoFn):
    def __init__(self, dataset, table):
        self.dataset = dataset
        self.table = table

    def process(self, element):
        dataset_res = self.dataset.get()
        table_res = self.table.get()
        logging.info('Writing to table: {}.{}'.format(dataset_res,table_res))
        beam.io.WriteToBigQuery(
        #dataset= runtime_options.dataset,
        table = str(dataset_res) + '.' + str(table_res), 
        schema = SCHEMA_ADFImpression,
        project = str(PROJECT_ID), #options.display_data()['project'],
        create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
        write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
        )
# https://cloud.google.com/dataflow/docs/guides/templates/creating-templates#valueprovider
class FileIterator(beam.DoFn):
    def __init__(self, files_bucket):
        self.files_bucket = files_bucket

    def process(self, element):
        files = pd.read_csv(str(element), header=None).values[0].tolist()
        bucket = self.files_bucket.get()
        files = [str(bucket) + '/' + file for file in files]
        logging.info('Files list is: {}'.format(files))
        return files

# https://stackoverflow.com/questions/58240058/ways-of-using-value-provider-parameter-in-python-apache-beam   
class OutputValueProviderFn(beam.DoFn):
    def __init__(self, vp):
        self.vp = vp

    def process(self, unused_elm):
        yield self.vp.get()


class RuntimeOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):

        parser.add_value_provider_argument(
          '--dataset',
          default='EDITED FOR PRIVACY',
          help='BQ dataset to write to',
          type=str)

        parser.add_value_provider_argument(
          '--table',
          default='EDITED FOR PRIVACY',
          required=False,
          help='BQ table to write to',
          type=str)

        parser.add_value_provider_argument(
          '--filename',
          default='EDITED FOR PRIVACY',
          help='Filename of batch file',
          type=str)

        parser.add_value_provider_argument(
          '--batch_bucket',
          default='EDITED FOR PRIVACY',
          help='Bucket for batch file',
          type=str)

        #parser.add_value_provider_argument(
        #   '--bq_schema',
          #default='gs://dataflow-samples/shakespeare/kinglear.txt',
        #  help='Schema to specify for BQ')

        #parser.add_value_provider_argument(
        #   '--schema_list',
          #default='gs://dataflow-samples/shakespeare/kinglear.txt',
        #  help='Schema in list for processing')

        parser.add_value_provider_argument(
          '--files_bucket',
          default='EDITED FOR PRIVACY',
          help='Bucket where the raw files are',
          type=str)

        parser.add_value_provider_argument(
          '--complete_batch',
          default='EDITED FOR PRIVACY',
          help='Bucket where the raw files are',
          type=str)
#=========================================================

def run():
    #====================================
    # TODO PUT AS PARAMETERS 
    #====================================
    JOB_NAME_READING = 'adf-reading'
    JOB_NAME_PROCESSING = 'adf-'

    job_name = '{}-batch--{}'.format(JOB_NAME_PROCESSING,_millis())

    pipeline_options_batch = PipelineOptions()

    runtime_options = pipeline_options_batch.view_as(RuntimeOptions)

    setup_options = pipeline_options_batch.view_as(SetupOptions)
    setup_options.setup_file  = './setup.py'
    google_cloud_options = pipeline_options_batch.view_as(GoogleCloudOptions)
    google_cloud_options.project = PROJECT_ID
    google_cloud_options.job_name = job_name
    google_cloud_options.region = 'europe-west1'
    google_cloud_options.staging_location = GCS_STAGING_LOCATION
    google_cloud_options.temp_location = GCS_TMP_LOCATION


    #pipeline_options_batch.view_as(StandardOptions).runner = 'DirectRunner'

    # # If datflow runner [BEGIN]
    pipeline_options_batch.view_as(StandardOptions).runner = 'DataflowRunner'
    pipeline_options_batch.view_as(WorkerOptions).autoscaling_algorithm = 'THROUGHPUT_BASED'

    #pipeline_options_batch.view_as(WorkerOptions).machine_type = 'n1-standard-96' #'n1-highmem-32' #' 
    pipeline_options_batch.view_as(WorkerOptions).max_num_workers = 10
    #  [END]

    pipeline_options_batch.view_as(SetupOptions).save_main_session = True
    #Needed this in order to pass table to BQ at runtime
    pipeline_options_batch.view_as(DebugOptions).experiments = ['use_beam_bq_sink']


    with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:

        try:

            final_data = (
            pipeline_2
            |'Create empty PCollection' >> beam.Create([None])
            |'Get accepted batch file 1/2:{}'.format(OutputValueProviderFn(runtime_options.complete_batch)) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
            |'Get accepted batch file 2/2:{}'.format(OutputValueProviderFn(runtime_options.complete_batch)) >> beam.ParDo(FileIterator(runtime_options.files_bucket))
            |'Read all files' >> beam.io.ReadAllFromText(skip_header_lines=1)
            |'Process all files' >> beam.ParDo(ProcessCSV(),COLUMNS_SCHEMA_0)
            |'Format all files' >> beam.ParDo(AdfDict())
            #|'WriteToBigQuery_{}'.format('test'+str(_millis())) >> beam.io.WriteToBigQuery(
            #        #dataset= runtime_options.dataset,
            #        table = str(runtime_options.dataset) + '.' + str(runtime_options.table), 
            #        schema = SCHEMA_ADFImpression,
            #        project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
            #        create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
            #        write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
            #        )
            |'WriteToBigQuery' >> beam.ParDo(BQ_flexible_writer(runtime_options.dataset,runtime_options.table))
            )
        except Exception as exception:
            logging.error(exception)
            pass

推荐答案

请使用以下附加选项运行此程序.

Please run this with following additional option.

--experiment=use_beam_bq_sink

如果没有这个,Dataflow 目前会使用不支持 ValueProviders 的原生版本覆盖 BigQuery 接收器.

Without this, Dataflow currently overrides BigQuery sink with a native version which does not support ValueProviders.

另外请注意,不支持将数据集设置为运行时参数.尝试将表参数指定为整个表引用(DATASET.TABLE 或 PROJECT:DATASET.TABLE).

Additionally, note that setting the dataset as a runtime parameter is not supported. Try specifying the table parameter as an entire table reference (DATASET.TABLE or PROJECT:DATASET.TABLE) instead.

这篇关于如何使用 Apache Beam 中的运行时值提供程序写入 Big Query?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆