气流如何设置dag_run.conf的默认值 [英] Airflow how to set default values for dag_run.conf

查看:35
本文介绍了气流如何设置dag_run.conf的默认值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试设置一个气流DAG,该DAG提供来自dag_run.conf的默认值。当从WebUI使用";run w/Config";选项运行DAG时,此功能非常有效。但按计划运行时,dag_run.conf字典不存在,任务会失败,如

jinja2.exceptions.UndefinedError: 'dict object' has no attribute 'key1'

下面是示例作业。

是否可以使dag_run.conf始终包含params此处定义的词典?

from airflow import DAG
from airflow.utils.dates import hours_ago
from airflow.operators.bash import BashOperator
from datetime import timedelta

def do_something(val1: str, val2: str) -> str:
    return f'echo vars are: "{val1}, {val2}"'

params = {
    'key1': 'def1',
    'key2': 'def2',        
}

default_args = {
    'retries': 0,
}

with DAG(
    'template_test',
    default_args=default_args,
    schedule_interval=timedelta(minutes=1),
    start_date=hours_ago(1),
    params = params,
) as dag:

    hello_t = BashOperator(
        task_id='example-command',
        bash_command=do_something('{{dag_run.conf["key1"]}}', '{{dag_run.conf["key2"]}}'),
        config=params,
    )

我见过的最接近的是在For Apache Airflow, How can I pass the parameters when manually trigger DAG via CLI?中,但是在那里他们利用JJJA和if/Else-这将需要定义这些默认参数两次。我只想定义它们一次。

推荐答案

您可以使用DAGparams来实现您想要的功能:

params(Dict)-可在模板中访问的DAG级别参数字典,命名空间在params下。可以在任务级别覆盖这些参数。

您可以在DAG或任务级别定义params,也可以从触发器DAG w/config部分的UI添加或修改它们。

DAG示例:

default_args = {
    "owner": "airflow",
}

dag = DAG(
    dag_id="example_dag_params",
    default_args=default_args,
    schedule_interval="*/5 * * * *",
    start_date=days_ago(1),
    params={"param1": "first_param"},
    catchup=False,
)
with dag:

    bash_task = BashOperator(
        task_id="bash_task", bash_command="echo bash_task: {{ params.param1 }}"
    )

输出日志

[2021-10-02 20:23:25,808] {logging_mixin.py:104} INFO - Running <TaskInstance: example_dag_params.bash_task 2021-10-02T23:15:00+00:00 [running]> on host worker_01
[2021-10-02 20:23:25,867] {taskinstance.py:1302} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=example_dag_params
AIRFLOW_CTX_TASK_ID=bash_task
AIRFLOW_CTX_EXECUTION_DATE=2021-10-02T23:15:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2021-10-02T23:15:00+00:00
[2021-10-02 20:23:25,870] {subprocess.py:52} INFO - Tmp dir root location: 
 /tmp
[2021-10-02 20:23:25,871] {subprocess.py:63} INFO - Running command: ['bash', '-c', 'echo bash_task: first_param']
[2021-10-02 20:23:25,884] {subprocess.py:74} INFO - Output:
[2021-10-02 20:23:25,886] {subprocess.py:78} INFO - bash_task: first_param
[2021-10-02 20:23:25,887] {subprocess.py:82} INFO - Command exited with return code 0

从日志中,请注意dag_run计划的,并且参数仍在那里。

您可以在this answer中找到有关使用参数的更广泛示例。

希望这对您有效!

这篇关于气流如何设置dag_run.conf的默认值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆