获取最近成功执行 DAG 的日期 [英] Getting the date of the most recent successful DAG execution
问题描述
我希望在 Airflow 中创建一个转换,并且我想确保从上次运行 DAG 以更新我的目标表以来从我的源中获取所有数据.为此,我希望能够获得最近成功的执行.
I am looking to create a transform in Airflow, and I want to ensure to get all data from my source since the last time a DAG has run in order to update my target table. In order to this, I want to be able to get the most recent execution which was successful.
我发现了这个:Apache 气流宏到get last dag run execution time 这让我到达最终目标的某个地方,但是,这只会获得 DAG 的最后一次执行时间,无论它是否成功.
I have found this: Apache airflow macro to get last dag run execution time which gets me somewhere to the end goal, however, this only gets the last time the DAG executed, regardless of it being successful or not.
SELECT col1, col2, col3
FROM schema.table
WHERE table.updated_at > '{{ last_dag_run_execution_date(dag) }}';
如果执行失败(由于连接或类似原因),last_dag_run_execution_date(dag) 将更新,但我们已经错过了之前 DAG 运行的执行.
If an execution fails (due to connectivity or something like), the last_dag_run_execution_date(dag) will update, but we've missed the execution for that previous DAG run.
理想情况下,这将拉取最近的非失败执行.或者,如果有人对我如何解决这个问题有任何想法,请告诉我
Ideally, this will pull the most recent non-failed execution. Or if anyone has any ideas how I can meet this, please let me know
推荐答案
我最终将引用问题中的函数更改为使用 latest_execution_date,它是 Airflow 中的预定义宏,如下所示:
I've ended up changing the function in the referenced question to use the latest_execution_date, which is a predefined macro in Airflow, as such:
def get_last_dag_run(dag):
last_dag_run = dag.latest_execution_date
if last_dag_run is None:
return '2013-01-01'
else:
return last_dag_run
目前似乎对我有用.
这篇关于获取最近成功执行 DAG 的日期的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!