如果任何任务失败,如何将Airflow DAG运行标记为失败? [英] How to mark an Airflow DAG run as failed if any task fails?

查看:777
本文介绍了如果任何任务失败,如何将Airflow DAG运行标记为失败?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果任何任务失败,是否有可能使Airflow DAG失败?

Is it possible to make an Airflow DAG fail if any task fails?

我通常在DAG结束时有一些清理任务,现在,每当最后一个任务成功完成时,整个DAG都被标记为成功。

I usually have some cleaning up tasks at the end of a DAG and as it is now, whenever the last task succeeds the whole DAG is marked as a success.

推荐答案

另一种解决方案是添加最终的PythonOperator检查此运行中所有任务的状态:

Another solution can be to add a final PythonOperator that checks the status of all tasks in this run:

final_status = PythonOperator(
    task_id='final_status',
    provide_context=True,
    python_callable=final_status,
    trigger_rule=TriggerRule.ALL_DONE, # Ensures this task runs even if upstream fails
    dag=dag,
)

def final_status(**kwargs):
    for task_instance in kwargs['dag_run'].get_task_instances():
        if task_instance.current_state() != State.SUCCESS and \
                task_instance.task_id != kwargs['task_instance'].task_id:
            raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))

这篇关于如果任何任务失败,如何将Airflow DAG运行标记为失败?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆