使用Airflow BigqueryOperator向BigQuery表添加标签 [英] Add labels to BigQuery tables using Airflow BigqueryOperator

查看:36
本文介绍了使用Airflow BigqueryOperator向BigQuery表添加标签的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我必须向BigQuery表添加标签。我知道可以通过BigQuery UI完成此操作,但如何通过气流操作符完成此操作。

使用案例:用于计费和搜索。由于多个团队在同一项目和数据集下工作,因此我们需要将各自团队创建的所有表组合在一起。由于每个团队对表有不同的标签,因此标签对我们是必需的。

bq_query = BigQueryOperator(bql=sql,
                            destination_dataset_table='my_dataset.my_table'),
                            task_id='bq_query',
                            bigquery_conn_id='my_bq_connection',
                            use_legacy_sql=False,
                            write_disposition='WRITE_TRUNCATE',
                            create_disposition='CREATE_IF_NEEDED',
                            query_params={})

在这里,我想将标签添加到此目标表destination_dataset_table='my_dataset.my_table'

我确实尝试了一下,如此链接所述:https://airflow.apache.org/docs/apache-airflow/1.10.12/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator.template_fields

from airflow import models, DAG
from airflow.contrib.operators import bigquery_operator, bigquery_to_gcs, bigquery_table_delete_operator
from airflow.operators.python_operator import PythonOperator



# Define Airflow DAG
with dag:

    bq_query = BigQueryOperator(sql='<some query>',
                                destination_dataset_table='my_dataset.my_table'),
                                task_id='bq_query',
                                use_legacy_sql=False,
                                write_disposition='WRITE_TRUNCATE',
                                create_disposition='CREATE_IF_NEEDED',
                                labels={'project_id': 'project_name', 'dag_id': 'dag_name', 'task_id': 'task_name'}
                                query_params={})

编辑

注意:根据文档,BigqueryOperator和BigqueryCreateExternalTableOperator的labels 定义不同。

BigqueryOperator的标签定义为a dictionary containing labels for the job/query, passed to BigQuery,运行正常。但是我想在创建表时向其添加标签。

我们广泛使用BigqueryOperator,无法使用BigqueryCreateExternalTableOperator

是否可以使用BigqueryOperator?或解决办法

对于您正在使用的类airflow.contrib.operators.bigquery_operator.BigQueryOperator,您可以使用参数标签

推荐答案

。您可以查看支持的参数。

标签-包含作业/查询标签的字典,传递给BigQuery

classairflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=, query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs)

您可以使用此类airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator创建带有标记的表

标签-包含传递给BigQuery的表标签的字典

classairflow.contrib.operators.bigquery_operator.BigQueryOperator(bql=None, sql=None, destination_dataset_table=None, write_disposition='WRITE_EMPTY', allow_large_results=False, flatten_results=None, bigquery_conn_id='bigquery_default', delegate_to=None, udf_config=None, use_legacy_sql=True, maximum_billing_tier=None, maximum_bytes_billed=None, create_disposition='CREATE_IF_NEEDED', schema_update_options=, query_params=None, labels=None, priority='INTERACTIVE', time_partitioning=None, api_resource_configs=None, cluster_fields=None, location=None, encryption_configuration=None, *args, **kwargs)
template_fields= ['bql', 'sql', 'destination_dataset_table', 'labels']

您可以查看有关这两个类的更多information信息。

编辑

您可以看到此示例。

   bq_query = BigQueryOperator(bql=sql,
                                destination_dataset_table='my_dataset.my_table'),
                                task_id='bq_query',
                                bigquery_conn_id='my_bq_connection',
                                use_legacy_sql=False,
                                write_disposition='WRITE_TRUNCATE',
                                create_disposition='CREATE_IF_NEEDED',
                                template_fields= ['dataset_id', 'table_id', 'project_id', 'gcs_schema_object', 'labels']
                                query_params={})

有关使用方法,请参阅example

这篇关于使用Airflow BigqueryOperator向BigQuery表添加标签的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆