Airflow BigQueryOperator:如何将查询结果保存在分区表中? [英] Airflow BigQueryOperator: how to save query result in a partitioned Table?
问题描述
我有一个简单的DAG
I have a simple DAG
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table='my_dataset.my_table20180524'),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={})
start >> bq_query >> end
执行 bq_query
任务时,SQL查询将保存在分片表中。我希望将其保存在每日分区表中。为此,我只将 destination_dataset_table
更改为 my_dataset.my_table $ 20180524
。执行 bq_task
时出现以下错误:
When executing the bq_query
task the SQL query gets saved in a sharded table. I want it to get saved in a daily partitioned table. In order to do so, I only changed destination_dataset_table
to my_dataset.my_table$20180524
. I got the error below when executing the bq_task
:
Partitioning specification must be provided in order to create partitioned table
如何指定BigQuery将查询结果保存到每日分区表?我的第一个猜测是在 BigQueryOperator
中使用 query_params
,但是我没有找到有关如何使用该参数。
How can I specify to BigQuery to save query result to a daily partitioned table ? my first guess has been to use query_params
in BigQueryOperator
but I didn't find any example on how to use that parameter.
编辑:
我正在使用 google -cloud == 0.27.0
python客户端...这是Prod中使用的客户端:(
I'm using google-cloud==0.27.0
python client ... and it's the one used in Prod :(
推荐答案
您首先需要创建一个空的分区目标表。请按照此处的说明进行操作:链接创建一个空的分区表
You first need to create an Empty partitioned destination table. Follow instructions here: link to create an empty partitioned table
然后再次在气流管道下运行
您可以尝试以下代码:
and then run below airflow pipeline again. You can try code:
import datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
today_date = datetime.datetime.now().strftime("%Y%m%d")
table_name = 'my_dataset.my_table' + '$' + today_date
with DAG(dag_id='my_dags.my_dag') as dag:
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
sql = """
SELECT *
FROM 'another_dataset.another_table'
"""
bq_query = BigQueryOperator(bql=sql,
destination_dataset_table={{ params.t_name }}),
task_id='bq_query',
bigquery_conn_id='my_bq_connection',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
create_disposition='CREATE_IF_NEEDED',
query_params={'t_name': table_name},
dag=dag
)
start >> bq_query >> end
所以我要做的是创建了一个动态表名变量并将其传递给BQ运算符。
So what I did is that I created a dynamic table name variable and passed to the BQ operator.
这篇关于Airflow BigQueryOperator:如何将查询结果保存在分区表中?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!