如果一个任务因触发器规则ALL_DONE而失败,则DAG标记为&QOOT;SUCCESS"; [英] DAG marked as "success" if one task fails, because of trigger rule ALL_DONE
本文介绍了如果一个任务因触发器规则ALL_DONE而失败,则DAG标记为&QOOT;SUCCESS";的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有以下具有3项任务的DAG:
start --> special_task --> end
中间的任务可以成功也可以失败,但是end
必须始终执行(假设这是一个干净关闭资源的任务)。为此,我使用了trigger ruleALL_DONE
:
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
使用它,如果special_task
失败,end
将被正确执行。但是,由于end
是最后一个任务并成功,因此DAG始终标记为SUCCESS
。
如何配置我的DAG,以便在其中一个任务失败时,整个DAG标记为FAILED
?
要复制的示例
import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils import trigger_rule
dag = DAG(
dag_id='my_dag',
start_date=datetime.datetime.today(),
schedule_interval=None
)
start = BashOperator(
task_id='start',
bash_command='echo start',
dag=dag
)
special_task = BashOperator(
task_id='special_task',
bash_command='exit 1', # force failure
dag=dag
)
end = BashOperator(
task_id='end',
bash_command='echo end',
dag=dag
)
end.trigger_rule = trigger_rule.TriggerRule.ALL_DONE
start.set_downstream(special_task)
special_task.set_downstream(end)
This post看似相关,但答案不符合我的需要,因为下游任务end
必须执行(因此必须执行trigger_rule
)。
推荐答案
如@JustinasMarozas在comment中所述,解决方案是创建一个虚拟任务,如下所示:
dummy = DummyOperator(
task_id='test',
dag=dag
)
并将其下游绑定到special_task
:
failing_task.set_downstream(dummy)
因此,DAG标记为失败,dummy
任务标记为upstream_failed
。
希望有现成的解决方案,但在等待解决方案之前,此解决方案可以解决问题。
这篇关于如果一个任务因触发器规则ALL_DONE而失败,则DAG标记为&QOOT;SUCCESS";的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文