你如何在 Airflow 中使用 BigQueryInsertJobOperator 传递变量 [英] How do you pass variables with BigQueryInsertJobOperator in Airflow
问题描述
我正在尝试使用 BigQueryInsertJobOperator 来查询 Google BigQuery.我试图将变量名传递到 sql 文件中,但出现错误.根据我在 sql 文件中键入的内容,错误略有不同,但它无法识别该变量.
I am trying to use BigQueryInsertJobOperator for a query of Google BigQuery. I am trying to pass variable names into a sql file, but I am getting an error. The error is slightly different based on what I type in the sql file, but it doesn't recognize the variable.
DAG
TABLE_1 = "my_table"
with DAG(
dag_id,
schedule_interval='@daily',
start_date= days_ago(1),
template_searchpath = '/mypath/sql/',
catchup = False,
) as dag:
query_one = BigQueryInsertJobOperator(
task_id='query_test',
configuration={
"query":{
"query": "{% include 'big_insert_job_test.sql' %}",
"defaultDataset":{
"datasetId":"my_dataset_id",
"projectId":"my_project_id"
},
"queryParameters": [
{
"name": "TABLE_1",
"parameterType": {
"type": "STRING",
},
"parameterValue": {"value":TABLE_1}
}
]
}
}
SQL 文件
SELECT * FROM @TABLE_1
根据谷歌文档,我可以将变量作为@"传递;或?".我尝试过以多种不同的方式发送变量:
According to the google docs I can pass a variable as a "@" or a "?". I have tried sending the variables many different ways:
SELECT * FROM {{params.TABLE_1}}. // This is how it works with BigQueryExecuteQueryOperator,
but that is deprecated so trying to use the BigQueryInsertJobOperator
SELECT * FROM `TABLE_1` // just an example,
I have tried every combo of "", '', {}, etc
推荐答案
可以使用 user_defined_macros={varname": var1, varname2": var2} 将参数从 DAG 传递到单独的 sql 文件.因此,将您的变量传递到 DAG 的顶部以进行常规配置,如果您调用文件,它将在您的运算符中可用.
Parameters can be passed from your DAG to a separate sql file by using the user_defined_macros={"varname": var1, "varname2": var2}. So pass your variables in the top portion of the DAG for general config, and it will be available in your operators if you call a file.
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.dates import days_ago
PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset")
TABLE_1 = "table1"
dag_id = "dag1"
with models.DAG(
dag_id,
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_1},
) as dag:
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include 'example_bigquery_query.sql' %}",
"useLegacySql": False,
}
},
)
在您的 sql 文件中,用双括号将变量括起来.
In your sql file, inclose the variable with double brackets.
SELECT * FROM {{ DATASET }}.{{ TABLE }}
这里参考了来自airflow github的例子
For reference here are the examples from airflow's github
示例 sql 文件: 查看全文