如果任何任务失败,如何将Airflow DAG运行标记为失败? [英] How to mark an Airflow DAG run as failed if any task fails?
本文介绍了如果任何任务失败,如何将Airflow DAG运行标记为失败?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
如果任何任务失败,是否有可能使Airflow DAG失败?
Is it possible to make an Airflow DAG fail if any task fails?
我通常在DAG结束时有一些清理任务,现在,每当最后一个任务成功完成时,整个DAG都被标记为成功。
I usually have some cleaning up tasks at the end of a DAG and as it is now, whenever the last task succeeds the whole DAG is marked as a success.
推荐答案
另一种解决方案是添加最终的PythonOperator检查此运行中所有任务的状态:
Another solution can be to add a final PythonOperator that checks the status of all tasks in this run:
final_status = PythonOperator(
task_id='final_status',
provide_context=True,
python_callable=final_status,
trigger_rule=TriggerRule.ALL_DONE, # Ensures this task runs even if upstream fails
dag=dag,
)
def final_status(**kwargs):
for task_instance in kwargs['dag_run'].get_task_instances():
if task_instance.current_state() != State.SUCCESS and \
task_instance.task_id != kwargs['task_instance'].task_id:
raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))
这篇关于如果任何任务失败,如何将Airflow DAG运行标记为失败?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文