气流如何设置dag_run.conf的默认值 [英] Airflow how to set default values for dag_run.conf
本文介绍了气流如何设置dag_run.conf的默认值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
dag_run.conf
的默认值。当从WebUI使用";run w/Config";选项运行DAG时,此功能非常有效。但按计划运行时,dag_run.conf
字典不存在,任务会失败,如
jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'
下面是示例作业。
是否可以使dag_run.conf
始终包含params
此处定义的词典?
from airflow import DAG
from airflow.utils.dates import hours_ago
from airflow.operators.bash import BashOperator
from datetime import timedelta
def do_something(val1: str, val2: str) -> str:
return f'echo vars are: "{val1}, {val2}"'
params = {
'key1': 'def1',
'key2': 'def2',
}
default_args = {
'retries': 0,
}
with DAG(
'template_test',
default_args=default_args,
schedule_interval=timedelta(minutes=1),
start_date=hours_ago(1),
params = params,
) as dag:
hello_t = BashOperator(
task_id='example-command',
bash_command=do_something('{{dag_run.conf["key1"]}}', '{{dag_run.conf["key2"]}}'),
config=params,
)
我见过的最接近的是在For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?中,但是在那里他们利用JJJA和if/Else-这将需要定义这些默认参数两次。我只想定义它们一次。
推荐答案
您可以使用DAGparams来实现您想要的功能:
params(Dict)-可在模板中访问的DAG级别参数字典,命名空间在params下。可以在任务级别覆盖这些参数。
您可以在DAG或任务级别定义params
,也可以从触发器DAG w/config部分的UI添加或修改它们。
DAG示例:
default_args = {
"owner": "airflow",
}
dag = DAG(
dag_id="example_dag_params",
default_args=default_args,
schedule_interval="*/5 * * * *",
start_date=days_ago(1),
params={"param1": "first_param"},
catchup=False,
)
with dag:
bash_task = BashOperator(
task_id="bash_task", bash_command="echo bash_task: {{ params.param1 }}"
)
输出日志:
[2021-10-02 20:23:25,808] {logging_mixin.py:104} INFO - Running <TaskInstance: example_dag_params.bash_task 2021-10-02T23:15:00+00:00 [running]> on host worker_01
[2021-10-02 20:23:25,867] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_dag_params
AIRFLOW_CTX_TASK_ID=bash_task
AIRFLOW_CTX_EXECUTION_DATE=2021-10-02T23:15:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-10-02T23:15:00+00:00
[2021-10-02 20:23:25,870] {subprocess.py:52} INFO - Tmp dir root location:
/tmp
[2021-10-02 20:23:25,871] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo bash_task: first_param']
[2021-10-02 20:23:25,884] {subprocess.py:74} INFO - Output:
[2021-10-02 20:23:25,886] {subprocess.py:78} INFO - bash_task: first_param
[2021-10-02 20:23:25,887] {subprocess.py:82} INFO - Command exited with return code 0
从日志中,请注意dag_run
是计划的,并且参数仍在那里。
您可以在this answer中找到有关使用参数的更广泛示例。
希望这对您有效!
这篇关于气流如何设置dag_run.conf的默认值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文