作为Jinja模板变量传递时,Airflow sql_path无法读取sql文件 [英] Airflow sql_path not able to read the sql files when passed as Jinja Template Variable
问题描述
我正在尝试使用Jinja模板变量,而不是使用Variable.get('sql_path'),以免每次对dag文件的扫描都击中数据库
I am trying to use Jinja template variable as against using Variable.get('sql_path'), So as to avoid hitting DB for every scan of the dag file
原始代码
import datetime
import os
from functools import partial
from datetime import timedelta
from airflow.models import DAG,Variable
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from alerts.email_operator import dag_failure_email
SNOWFLAKE_CONN_ID = 'etl_conn'
tmpl_search_path = []
for subdir in ['business/', 'audit/', 'business/transform/']:
tmpl_search_path.append(os.path.join(Variable.get('sql_path'), subdir))
def get_db_dag(
*,
dag_id,
start_date,
schedule_interval,
max_taskrun,
max_dagrun,
proc_nm,
load_sql
):
default_args = {
'owner': 'airflow',
'start_date': start_date,
'provide_context': True,
'execution_timeout': timedelta(minutes=max_taskrun),
'retries': 0,
'retry_delay': timedelta(minutes=3),
'retry_exponential_backoff': True,
'email_on_retry': False,
}
dag = DAG(
dag_id=dag_id,
schedule_interval=schedule_interval,
dagrun_timeout=timedelta(hours=max_dagrun),
template_searchpath=tmpl_search_path,
default_args=default_args,
max_active_runs=1,
catchup='{{var.value.dag_catchup}}',
on_failure_callback=alert_email_callback,
)
load_table = SnowflakeOperator(
task_id='load_table',
sql=load_sql,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
autocommit=True,
dag=dag,
)
load_vcc_svc_recon
return dag
# ======== DAG DEFINITIONS #
edw_table_A = get_db_dag(
dag_id='edw_table_A',
start_date=datetime.datetime(2020, 5, 21),
schedule_interval='0 5 * * *',
max_taskrun=3, # Minutes
max_dagrun=1, # Hours
load_sql='recon/extract.sql',
)
当我替换 Variable.get('sql_path')
与Jinja模板'{{var.value.sql_path}}''
如下并运行Dag,它抛出了如下错误,这是无法从SQL文件夹的子目录中获取文件
When I have replaced Variable.get('sql_path')
with Jinja Template '{{var.value.sql_path}}'
as below and ran the Dag, it threw an error as below, it was not able to get the file from the subdirectory of the SQL folder
tmpl_search_path = []
for subdir in ['bus/', 'audit/', 'business/snflk/']:
tmpl_search_path.append(os.path.join('{{var.value.sql_path}}', subdir))
以下错误为
inja2.exceptions.TemplateNotFound:extract.sql
Got below error as inja2.exceptions.TemplateNotFound: extract.sql
推荐答案
在DAG脚本中,并非到处都呈现模板。通常,它们以模板化参数呈现运营商。因此,除非您将 tmpl_search_path
的元素传递给某些模板化参数 {{var.value.sql_path}}
,否则不会
Templates are not rendered everywhere in a DAG script. Usually they are rendered in the templated parameters of Operators. So, unless you pass the elements of tmpl_search_path
to some templated parameter {{var.value.sql_path}}
will not be rendered.
不是 DAG
的 template_searchpath
模板化。这就是为什么您不能将Jinja模板传递给它。
The template_searchpath
of DAG
is not templated. That is why you cannot pass Jinja templates to it.
我认为可以选择的选项是
The options of which I can think are
- 在管道脚本中对
Variable.get('sql_path')
的值进行硬编码。 - 保存该值配置文件中的
Variable.get('sql_path')
并在管道脚本中从那里读取它。 - 移动
Variable.get()
调用退出for循环。这将导致对数据库的请求减少三倍。
- Hardcode the value of
Variable.get('sql_path')
in the pipeline script. - Save the value of
Variable.get('sql_path')
in a configuration file and read it from there in the pipeline script. - Move the
Variable.get()
call out of the for-loop. This will result in three times fewer requests to the database.
这篇关于作为Jinja模板变量传递时,Airflow sql_path无法读取sql文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!