你如何在 Airflow 中使用 BigQueryInsertJobOperator 传递变量 [英] How do you pass variables with BigQueryInsertJobOperator in Airflow

查看:26
本文介绍了你如何在 Airflow 中使用 BigQueryInsertJobOperator 传递变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 BigQueryInsertJobOperator 来查询 Google BigQuery.我试图将变量名传递到 sql 文件中,但出现错误.根据我在 sql 文件中键入的内容,错误略有不同,但它无法识别该变量.

I am trying to use BigQueryInsertJobOperator for a query of Google BigQuery. I am trying to pass variable names into a sql file, but I am getting an error. The error is slightly different based on what I type in the sql file, but it doesn't recognize the variable.

DAG

TABLE_1 = "my_table"

with DAG(
    dag_id,
    schedule_interval='@daily',
    start_date= days_ago(1),
    template_searchpath = '/mypath/sql/',
    catchup = False,
   
    
) as dag:

    query_one = BigQueryInsertJobOperator(
        task_id='query_test',
        configuration={
            "query":{
                "query": "{% include 'big_insert_job_test.sql' %}",
                "defaultDataset":{
                    "datasetId":"my_dataset_id",
                    "projectId":"my_project_id"
                },
    
                "queryParameters": [
                    {
                    "name": "TABLE_1",
                    "parameterType": {
                        "type": "STRING",
                    },
                    "parameterValue": {"value":TABLE_1}
                    }
                ]   
    
                            
            }
        }

SQL 文件

SELECT * FROM @TABLE_1

根据谷歌文档,我可以将变量作为@"传递;或?".我尝试过以多种不同的方式发送变量:

According to the google docs I can pass a variable as a "@" or a "?". I have tried sending the variables many different ways:

SELECT * FROM {{params.TABLE_1}}. // This is how it works with BigQueryExecuteQueryOperator, 
but that is deprecated so trying to use the BigQueryInsertJobOperator

SELECT * FROM `TABLE_1` // just an example, 
I have tried every combo of "", '', {}, etc

推荐答案

可以使用 user_defined_macros={varname": var1, varname2": var2} 将参数从 DAG 传递到单独的 sql 文件.因此,将您的变量传递到 DAG 的顶部以进行常规配置,如果您调用文件,它将在您的运算符中可用.

Parameters can be passed from your DAG to a separate sql file by using the user_defined_macros={"varname": var1, "varname2": var2}. So pass your variables in the top portion of the DAG for general config, and it will be available in your operators if you call a file.

from airflow import models
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from airflow.utils.dates import days_ago

PROJECT_ID = os.environ.get("GCP_PROJECT_ID", "example-project")
DATASET_NAME = os.environ.get("GCP_BIGQUERY_DATASET_NAME", "test_dataset")

TABLE_1 = "table1"
dag_id = "dag1"

with models.DAG(
    dag_id,
    schedule_interval=None,  # Override to match your needs
    start_date=days_ago(1),
    tags=["example"],
    user_defined_macros={"DATASET": DATASET_NAME, "TABLE": TABLE_1},
) as dag:

    select_query_job = BigQueryInsertJobOperator(
        task_id="select_query_job",
        configuration={
            "query": {
                "query": "{% include 'example_bigquery_query.sql' %}",
                "useLegacySql": False,
            }
        },
    )

在您的 sql 文件中,用双括号将变量括起来.

In your sql file, inclose the variable with double brackets.

SELECT * FROM {{ DATASET }}.{{ TABLE }}

这里参考了来自airflow github的例子

For reference here are the examples from airflow's github

示例 dag 文件:httpsapache/airflow/blob/d87ab6d3a54cb6937cfa1771c901f34dda6a2f65/airflow/providers/google/cloud/example_dags/example_bigquery_queries.py

示例 sql 文件: 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆