如何找到Airflow中失败的上游任务数量? [英] How to find the number of upstream tasks failed in Airflow?

查看:33
本文介绍了如何找到Airflow中失败的上游任务数量?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很难找出如何为同一天(同一执行日)运行两次的同一个 dag 运行找到失败的任务.

I am having a tough time in figuring out how to find the failed task for the same dag run running twice on same day(same execution day).

考虑一个例子,当带有 dag_id=1 的 dag 在第一次运行时失败(由于任何原因,可以说连接超时)并且任务失败.当我们尝试查询它时,TaskInstance 表将包含失败任务的条目.太棒了!!

Consider an example when a dag with dag_id=1 has failed on the first run (due to any reason lets say connection timeout maybe) and task got failed. TaskInstance table will contain the entry of the failed task when we try to query it. GREAT!!

但是,如果我重新运行相同的 dag(注意 dag_id 仍然为 1),那么在最后一个任务中(它具有 ALL_DONE 的规则,因此无论上游任务是否失败或成功将被执行)我想计算当前 dag_run 中失败的任务数,忽略之前的 dag_runs.我遇到了 dag_run id,如果我们可以将它与 TaskInstance 相关联,它可能会很有用,但我不能.任何建议/帮助表示赞赏.

But, If I re-run the same dag(note that dag_id is still 1) then in the last task(it has the rule of ALL_DONE so irrespective of the whether upstream task was failed or was successful it will be executed) I want to calculate the number of tasks failed in the current dag_run ignoring the previous dag_runs. I came across dag_run id which could be useful if we can relate it to TaskInstance but I could not. Any suggestions/help is appreciated.

推荐答案

您可以创建一个 PythonOperator 任务来查询 Airflow 数据库以查找您要查找的信息.这具有传递查询所需数据所需的信息的额外好处:

You could create a PythonOperator task which queries the Airflow database to find the information you're looking for. This has the added benefit of passing along the information you need to query for the data you want:

from contextlib import closing
from airflow import models, settings
from airflow.utils.state import State

def your_python_operator_callable(**context):    
  with closing(settings.Session()) as session:
    print("There are {} failed tasks in this execution".format(
      session.query(
        models.TaskInstance
      ).filter(
        models.TaskInstance.dag_id == context["dag"].dag_id, 
        models.TaskInstance.execution_date == context["execution_date"],
        models.TaskInstance.state == State.FAILED).count()
      )

然后使用 PythonOperator 将任务添加到您的 DAG.

Then add the task to your DAG with a PythonOperator.

(我还没有测试过上面的内容,但希望能让你走上正确的道路)

这篇关于如何找到Airflow中失败的上游任务数量?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆