Airflow BigQueryOperator:如何将查询结果保存在分区表中? [英] Airflow BigQueryOperator: how to save query result in a partitioned Table?

查看:263
本文介绍了Airflow BigQueryOperator:如何将查询结果保存在分区表中?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的DAG

I have a simple DAG

from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

with DAG(dag_id='my_dags.my_dag') as dag:

    start = DummyOperator(task_id='start')

    end = DummyOperator(task_id='end')
    sql = """
             SELECT *
             FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                            destination_dataset_table='my_dataset.my_table20180524'),
                            task_id='bq_query',
                            bigquery_conn_id='my_bq_connection',
                            use_legacy_sql=False,
                            write_disposition='WRITE_TRUNCATE',
                            create_disposition='CREATE_IF_NEEDED',
                            query_params={})
    start >> bq_query >> end

执行 bq_query 任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我只将 destination_dataset_table 更改为 my_dataset.my_table $ 20180524 。执行 bq_task 时出现以下错误:

When executing the bq_query task the SQL query gets saved in a sharded table. I want it to get saved in a daily partitioned table. In order to do so, I only changed destination_dataset_table to my_dataset.my_table$20180524. I got the error below when executing the bq_task:

Partitioning specification must be provided in order to create partitioned table

如何指定BigQuery将查询结果保存到每日分区表?我的第一个猜测是在 BigQueryOperator
中使用 query_params ,但是我没有找到有关如何使用该参数。

How can I specify to BigQuery to save query result to a daily partitioned table ? my first guess has been to use query_params in BigQueryOperator but I didn't find any example on how to use that parameter.

编辑:

我正在使用 google -cloud == 0.27.0 python客户端...这是Prod中使用的客户端:(

I'm using google-cloud==0.27.0 python client ... and it's the one used in Prod :(

推荐答案

您首先需要创建一个空的分区目标表。请按照此处的说明进行操作:链接创建一个空的分区表

You first need to create an Empty partitioned destination table. Follow instructions here: link to create an empty partitioned table

然后再次在气流管道下运行
您可以尝试以下代码:

and then run below airflow pipeline again. You can try code:

import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')
    sql = """
         SELECT *
         FROM 'another_dataset.another_table'
          """
    bq_query = BigQueryOperator(bql=sql,
                        destination_dataset_table={{ params.t_name }}),
                        task_id='bq_query',
                        bigquery_conn_id='my_bq_connection',
                        use_legacy_sql=False,
                        write_disposition='WRITE_TRUNCATE',
                        create_disposition='CREATE_IF_NEEDED',
                        query_params={'t_name': table_name},
                        dag=dag
                        )
start >> bq_query >> end

所以我要做的是创建了一个动态表名变量并将其传递给BQ运算符。

So what I did is that I created a dynamic table name variable and passed to the BQ operator.

这篇关于Airflow BigQueryOperator:如何将查询结果保存在分区表中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆