apache beam.io.BigQuerySource use_standard_sql 在作为数据流运行器运行时不起作用 [英] apache beam.io.BigQuerySource use_standard_sql not working when running as dataflow runner

查看:21
本文介绍了apache beam.io.BigQuerySource use_standard_sql 在作为数据流运行器运行时不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个数据流作业,我将首先从 bigquery 查询中读取(在标准 sql 中).它在直接转轮模式下完美运行.但是,我尝试在数据流运行器模式下运行此数据流并遇到此错误:

I have a dataflow job where I will read from bigquery query first (in standard sql). It works perfectly in direct runner mode. However I tried to run this dataflow in dataflow runner mode and encountered this error :

响应:<{'变化':'Origin,X-Origin,Referer','content-type':'application/json;charset=UTF-8', 'date': 'Thu, 24 Dec 24 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', '内容长度':'470','-content-encoding':'gzip'}>,内容<{错误":{代码":400,消息":查询分区表传统 SQL 中不支持字段上的:551608533331:GoogleSearchConsole.search_query_analytics_log.", "errors": [ { "message": "在传统 SQL 中不支持查询在字段上分区的表:551608GoogleSearchConsole.search_query_analytics_log.", "errors":search_query_analytics_log."、域":全局"、原因":无效"}],状态":INVALID_ARGUMENT";} } >

response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Thu, 24 Dec 2020 09:28:21 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '400', 'content-length': '470', '-content-encoding': 'gzip'}>, content <{ "error": { "code": 400, "message": "Querying tables partitioned on a field is not supported in Legacy SQL: 551608533331:GoogleSearchConsole.search_query_analytics_log.", "errors": [ { "message": "Querying tables partitioned on a field is not supported in Legacy SQL: 551608533331:GoogleSearchConsole.search_query_analytics_log.", "domain": "global", "reason": "invalid" } ], "status": "INVALID_ARGUMENT" } } >

显然 use_standard_sql 参数在数据流运行器模式下不起作用.版本:阿帕奇光束:2.24.0蟒蛇:3.8

Apparently the use_standard_sql parameter doesn't work in dataflow runner mode. Version: apache-beam: 2.24.0 python: 3.8

last_update_date = pipeline | 'Read last update date' >> beam.io.Read(beam.io.BigQuerySource(
    query='''
        SELECT
            MAX(date) AS date
        FROM
            GoogleSearchConsole.search_query_analytics_log
    ''',
    use_standard_sql=True
))

推荐答案

尝试以下从 Bigquery 读取数据并写入 Bigquery 的代码.代码是一个apache beam数据流运行器代码:-

Try out following code which read data from Bigquery and write to Bigquery. Code is a apache beam dataflow runner code:-

#------------Import Lib-----------------------#
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import apache_beam as beam, os, sys, argparse, logging
from apache_beam.options.pipeline_options import SetupOptions

#------------Set up BQ parameters-----------------------#
# Replace with Project Id
project = 'xxxxx'
#plitting Of Records----------------------#

def run(argv=None, save_main_session=True):
    parser = argparse.ArgumentParser()
    parser.add_argument(
          '--cur_suffix',
          dest='cur_suffix',
          help='Input table suffix to process.')
    known_args, pipeline_args = parser.parse_known_args(argv)


    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
    p1 = beam.Pipeline(options=pipeline_options)


    logging.info('***********')
    logging.info(known_args.cur_suffix)
    data_loading = (
        p1
        | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(query='''SELECT SUBSTR(_time, 1, 19) as _time, dest FROM `project.dataset.table`''', use_standard_sql=True))
    )

    project_id = "xxxxxxx"
    dataset_id = 'AAAAAAAA'
    table_schema_Audit = ('_time:DATETIME, dest:STRING')

#---------------------Type = audit----------------------------------------------------------------------------------------------------------------------
    result = (
    data_loading
        | 'Write-Audit' >> beam.io.WriteToBigQuery(
                                                    table='YYYYYYY',
                                                    dataset=dataset_id,
                                                    project=project_id,
                                                    schema=table_schema_Audit,
                                                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                                                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
                                                    ))



    result = p1.run()
    result.wait_until_finish()


if __name__ == '__main__':
  path_service_account = 'ABGFDfc927.json'
  os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
  run()

这篇关于apache beam.io.BigQuerySource use_standard_sql 在作为数据流运行器运行时不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆