获取最近成功执行 DAG 的日期 [英] Getting the date of the most recent successful DAG execution

查看:27
本文介绍了获取最近成功执行 DAG 的日期的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我希望在 Airflow 中创建一个转换,并且我想确保从上次运行 DAG 以更新我的目标表以来从我的源中获取所有数据.为此,我希望能够获得最近成功的执行.

I am looking to create a transform in Airflow, and I want to ensure to get all data from my source since the last time a DAG has run in order to update my target table. In order to this, I want to be able to get the most recent execution which was successful.

我发现了这个:Apache 气流宏到get last dag run execution time 这让我到达最终目标的某个地方,但是,这只会获得 DAG 的最后一次执行时间,无论它是否成功.

I have found this: Apache airflow macro to get last dag run execution time which gets me somewhere to the end goal, however, this only gets the last time the DAG executed, regardless of it being successful or not.

SELECT col1, col2, col3
FROM schema.table
WHERE table.updated_at > '{{ last_dag_run_execution_date(dag) }}';

如果执行失败(由于连接或类似原因),last_dag_run_execution_date(dag) 将更新,但我们已经错过了之前 DAG 运行的执行.

If an execution fails (due to connectivity or something like), the last_dag_run_execution_date(dag) will update, but we've missed the execution for that previous DAG run.

理想情况下,这将拉取最近的非失败执行.或者,如果有人对我如何解决这个问题有任何想法,请告诉我

Ideally, this will pull the most recent non-failed execution. Or if anyone has any ideas how I can meet this, please let me know

推荐答案

我最终将引用问题中的函数更改为使用 latest_execution_date,它是 Airflow 中的预定义宏,如下所示:

I've ended up changing the function in the referenced question to use the latest_execution_date, which is a predefined macro in Airflow, as such:

def get_last_dag_run(dag):
    last_dag_run = dag.latest_execution_date
    if last_dag_run is None: 
        return '2013-01-01'
    else:
        return last_dag_run

目前似乎对我有用.

这篇关于获取最近成功执行 DAG 的日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆