使用XCom通过GoogleCloudStorageToBigQueryOperator在气流中加载架构 [英] Using XCom to Load Schema in Airflow with GoogleCloudStorageToBigQueryOperator

查看:135
本文介绍了使用XCom通过GoogleCloudStorageToBigQueryOperator在气流中加载架构的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个与任务ID database_schema 关联的XCom,它存储在Airflow中,这是数据集 sales_table 我想加载到BigQuery中。

I have an XCom associated with the Task ID database_schema stored in Airflow that is the JSON schema for a dataset sales_table that I want to load into BigQuery.

BigQuery数据集 sales_table 的数据来自CSV文件 retailcustomer_data.csv 存储在Google云端存储中。将数据从GCS加载到BigQuery的运算符如下:

The data for the BigQuery dataset sales_table comes from a CSV file retailcustomer_data.csv stored in Google Cloud Storage. The operator for loading the data from GCS to BigQuery is as follows:

gcs_to_bigquery = GoogleCloudStorageToBigQueryOperator(task_id = 'gcs_to_bigquery', bucket = bucket, source_objects = ['retailcustomer_data.csv'], destination_project_dataset_table = dataset_table_name, schema_fields = "{{task_instance.xcom_pull(task_ids='database_schema')}}", write_disposition = 'WRITE_TRUNCATE', bigquery_conn_id = bq_connection, google_cloud_storage_conn_id = gcs_connection, dag = dag)

当上述运算符作为DAG的一部分运行时,我得到 gcs_to_bigquery 运算符的以下错误消息。有谁知道为什么与任务ID database_schema 关联的XCom没有被加载到 schema_fields 中> gcs_to_bigquery 运算符?以及如何解决此问题?

When the above operator runs as part of the DAG, I am getting the following error message for the gcs_to_bigquery operator. Does anyone know why the XCom associated with Task ID database_schema is not being loaded into schema_fields of the gcs_to_bigquery operator? And how does one fix this issue?

googleapiclient.errors.HttpError: <HttpError 400 when requesting https://bigquery.googleapis.com/bigquery/v2/projects/city_retail_project/jobs?alt=json returned "Invalid value at 'job.configuration.load.schema.fields' (type.googleapis.com/google.cloud.bigquery.v2.TableFieldSchema), "{{task_instance.xcom_pull(task_ids='database_schema')}}"">


推荐答案


  • 字段 schema_fields GoogleCloudStorageToBigQueryOperator 不包含在 template_fields

  • 所以您尝试的不会工作

    • The field schema_fields of GoogleCloudStorageToBigQueryOperator is NOT included in template_fields
    • So what you are trying will NOT work
    • ...

      引用来自 Gtoonstra


      并非运算符中的所有参数都是模板化的,因此您无法在各处使用Jinja
      模板。 Jinja模板仅适用于
      源文件内template_fields列表中列出的运算符
      中的那些字段,例如:

      Not all parameters in operators are templated, so you cannot use Jinja templates everywhere. The Jinja templates only work for those fields in operators where it’s listed in the template_fields list inside the source file, like:

      template_fields = ('audit_key', 'cycle_dtm')
      







      可能的解决方法


      Possible workarounds


      • 使用 schema_object 字段代替

      • 扩展运算符并定义您的自定义模板字段/逻辑

      这篇关于使用XCom通过GoogleCloudStorageToBigQueryOperator在气流中加载架构的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆