如果一个任务因触发器规则ALL_DONE而失败,则DAG标记为&QOOT;SUCCESS" [英] DAG marked as "success" if one task fails, because of trigger rule ALL_DONE

查看:30
本文介绍了如果一个任务因触发器规则ALL_DONE而失败,则DAG标记为&QOOT;SUCCESS"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下具有3项任务的DAG:

start --> special_task --> end

中间的任务可以成功也可以失败,但是end必须始终执行(假设这是一个干净关闭资源的任务)。为此,我使用了trigger ruleALL_DONE

end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
使用它,如果special_task失败,end将被正确执行。但是,由于end是最后一个任务并成功,因此DAG始终标记为SUCCESS

如何配置我的DAG,以便在其中一个任务失败时,整个DAG标记为FAILED

要复制的示例

import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils import trigger_rule

dag = DAG(
    dag_id='my_dag',
    start_date=datetime.datetime.today(),
    schedule_interval=None
)

start = BashOperator(
    task_id='start',
    bash_command='echo start',
    dag=dag
)

special_task = BashOperator(
    task_id='special_task',
    bash_command='exit 1', # force failure
    dag=dag
)

end = BashOperator(
    task_id='end',
    bash_command='echo end',
    dag=dag
)
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE

start.set_downstream(special_task)
special_task.set_downstream(end)

This post看似相关,但答案不符合我的需要,因为下游任务end必须执行(因此必须执行trigger_rule)。

推荐答案

@JustinasMarozascomment中所述,解决方案是创建一个虚拟任务,如下所示:

dummy = DummyOperator(
    task_id='test',
    dag=dag
)

并将其下游绑定到special_task

failing_task.set_downstream(dummy)

因此,DAG标记为失败,dummy任务标记为upstream_failed

希望有现成的解决方案,但在等待解决方案之前,此解决方案可以解决问题。

这篇关于如果一个任务因触发器规则ALL_DONE而失败,则DAG标记为&QOOT;SUCCESS"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆