气流:执行日期为执行日期=触发日期=固定时间表的dag运行 [英] Airflow : dag run with execution_date = trigger_date = fixed_schedule
问题描述
在空气流通方面,我想每个星期一的上午8点运行一次dag(当然,execution_date应该是当日星期一,上午8点").为此工作流程设置的相关参数是:
in airflow, I would like to run a dag each monday at 8am (the execution_date should be of course "current day monday 8 am"). The relevant parameters to set up for this workflow are :
-
start_date
:"2018-03-19" -
schedule_interval
:"0 8 * * MON"
start_date
: "2018-03-19"schedule_interval
: "0 8 * * MON"
我希望每个星期一早上8点见到一个跑狗.第一个运行于19-03-2018上午8点,execution_date
= 2018-03-19-08-00-00等等,每个星期一.
I expect to see a dag run every monday at 8am . The first one being run the 19-03-2018 at 8 am with execution_date
= 2018-03-19-08-00-00 and so on each monday.
但是这不是什么情况:dag不在上午8点19/03/18开始.此处以实际行为为例进行说明: https://stackoverflow.com/a/39620901/1510109 或 https://stackoverflow.com/a/48213964/1510109 行为是:在间隔的每个结束时(在我的情况下是每周一次),dag的运行是execute_date =间隔的开始(即前一周).这种行为显然是由"ETL的思维方式"推动的(请参见上面的链接).但这绝对不是我想要的.
However it's not what happens : the dag is not started on 19/03/18 at 8 am. The real behaviour is explained here for exemple : https://stackoverflow.com/a/39620901/1510109 or https://stackoverflow.com/a/48213964/1510109 The behaviour is : at each end of the interval ( weekly in my case) the dag is run with execution_date = beginning of the interval (i.e the previous week). This behavour is apparently motivated by an "ETL way of thinking" (see the link above). But it's absolutely not what I want.
我如何才能在每个星期一的上午08:00进行execution_date
= trigger_date
=现在(=当前星期一上午8点)运行我的dag?
How what can I achieve to run my dag each monday at 08:00am with execution_date
= trigger_date
= now ( = current monday 8am) ?
谢谢
推荐答案
以从头开始快速查看我的答案时间和execute_date示例.
Take a quick look at my answer with start times and execution_date examples.
您想在每个星期一的上午8点跑步.
You want to run every Monday at 8am.
所以这部分将保持不变:
So this part is going to stay the same:
schedule_interval: '0 8 * * MON',
您希望它运行于2018年3月19日,因为首次运行发生在开始日期之后的第一个完整计划周期的末尾,因此您应该将开始日期更改为:
You want it to run it's first run on 2018-03-19, since the first run occurs at the end of the first full schedule period after the start date, you should change your start date to:
start_date: datetime(2018,03,12),
您将不得不面对这样一个事实,即Airflow将在每个周期的开始命名您的DagRun,并根据设置为间隔周期开始的execution_date
传入宏.相应地调整逻辑.
You will have to live with the fact that Airflow will name your DagRuns with the start of each period and pass in macros based on the execution_date
set to the start of the interval period. Adjust your logic accordingly.
您的第一次运行将在2018-03-19T08:00:00.0Z
和execution_date
之后开始,其他所有依赖它的宏都将开始,并且DagRun的名称将为2018-03-12T08:00:00.0Z
Your first run will start after 2018-03-19T08:00:00.0Z
and the execution_date
, every other macro that depends on it, and name of the DagRun will be 2018-03-12T08:00:00.0Z
只要您了解对execution_date
的期望,并且不会尝试根据datetime.now()
花费时间,则DAG可以在操作中发挥作用.可以在任何PythonOperator
或自定义运算符中随意创建一个新变量,例如my_execution_date = execution_date + datetime.timedelta(7)
(您可以从任务的上下文中获取execute_date),可以使用模板语句,例如{{ (execution_date + macros.timedelta(7)).strftime('%Y%m%d') }}
或{{ macros.ds_add(ds, 7) }}
或使用next_execution_date
.
So long as you understand what to expect from the execution_date
and you don't try to base your time off of datetime.now()
your DAGs will be able to be idempotent in operation. Feel free to make a new variable like my_execution_date = execution_date + datetime.timedelta(7)
within any PythonOperator
or custom operator (you get execution_date from the context of the task), use template statements like {{ (execution_date + macros.timedelta(7)).strftime('%Y%m%d') }}
or {{ macros.ds_add(ds, 7) }}
, or use the next_execution_date
.
您甚至可以添加user_defined_macros
这样的dag级别来启用{{ dt(execution_date) }}
.最近添加了user_defined_filters
,就像{'dt':lambda d: d+datetime.timedelta(days=7)}
启用{{ execution_date | dt }}
一样. next_ds
和next_execution_date
会更容易实现您的目的.
You can even add a dag level user_defined_macros
like {'dt':lambda d: d+datetime.timedelta(days=7)}
to enable {{ dt(execution_date) }}
. And recently user_defined_filters
were added like {'dt':lambda d: d+datetime.timedelta(days=7)}
enabling {{ execution_date | dt }}
. The next_ds
and next_execution_date
would be easier for your purposes.
在考虑模板时,您还可以阅读其中的内置内容: http://jinja.pocoo.org/docs/2.10/templates/#builtin-filters
While thinking about templating, you may as well read up on the built-in stuff out there: http://jinja.pocoo.org/docs/2.10/templates/#builtin-filters
这篇关于气流:执行日期为执行日期=触发日期=固定时间表的dag运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!