气流:执行日期为执行日期=触发日期=固定时间表的dag运行 [英] Airflow : dag run with execution_date = trigger_date = fixed_schedule

查看:295
本文介绍了气流:执行日期为执行日期=触发日期=固定时间表的dag运行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在空气流通方面,我想每个星期一的上午8点运行一次dag(当然,execution_date应该是当日星期一,上午8点").为此工作流程设置的相关参数是:

in airflow, I would like to run a dag each monday at 8am (the execution_date should be of course "current day monday 8 am"). The relevant parameters to set up for this workflow are :

  • start_date:"2018-03-19"
  • schedule_interval:"0 8 * * MON"
  • start_date : "2018-03-19"
  • schedule_interval : "0 8 * * MON"

我希望每个星期一早上8点见到一个跑狗.第一个运行于19-03-2018上午8点,execution_date = 2018-03-19-08-00-00等等,每个星期一.

I expect to see a dag run every monday at 8am . The first one being run the 19-03-2018 at 8 am with execution_date = 2018-03-19-08-00-00 and so on each monday.

但是这不是什么情况:dag不在上午8点19/03/18开始.此处以实际行为为例进行说明: https://stackoverflow.com/a/39620901/1510109 https://stackoverflow.com/a/48213964/1510109 行为是:在间隔的每个结束时(在我的情况下是每周一次),dag的运行是execute_date =间隔的开始(即前一周).这种行为显然是由"ETL的思维方式"推动的(请参见上面的链接).但这绝对不是我想要的.

However it's not what happens : the dag is not started on 19/03/18 at 8 am. The real behaviour is explained here for exemple : https://stackoverflow.com/a/39620901/1510109 or https://stackoverflow.com/a/48213964/1510109 The behaviour is : at each end of the interval ( weekly in my case) the dag is run with execution_date = beginning of the interval (i.e the previous week). This behavour is apparently motivated by an "ETL way of thinking" (see the link above). But it's absolutely not what I want.

我如何才能在每个星期一的上午08:00进行execution_date = trigger_date =现在(=当前星期一上午8点)运行我的dag?

How what can I achieve to run my dag each monday at 08:00am with execution_date = trigger_date = now ( = current monday 8am) ?

谢谢

推荐答案

从头开始快速查看我的答案时间和execute_date示例.

Take a quick look at my answer with start times and execution_date examples.

您想在每个星期一的上午8点跑步.

You want to run every Monday at 8am.

所以这部分将保持不变:

So this part is going to stay the same:

schedule_interval: '0 8 * * MON',

您希望它运行于2018年3月19日,因为首次运行发生在开始日期之后的第一个完整计划周期的末尾,因此您应该将开始日期更改为:

You want it to run it's first run on 2018-03-19, since the first run occurs at the end of the first full schedule period after the start date, you should change your start date to:

start_date: datetime(2018,03,12),

您将不得不面对这样一个事实,即Airflow将在每个周期的开始命名您的DagRun,并根据设置为间隔周期开始的execution_date传入宏.相应地调整逻辑.

You will have to live with the fact that Airflow will name your DagRuns with the start of each period and pass in macros based on the execution_date set to the start of the interval period. Adjust your logic accordingly.

您的第一次运行将在2018-03-19T08:00:00.0Zexecution_date之后开始,其他所有依赖它的宏都将开始,并且DagRun的名称将为2018-03-12T08:00:00.0Z

Your first run will start after 2018-03-19T08:00:00.0Z and the execution_date, every other macro that depends on it, and name of the DagRun will be 2018-03-12T08:00:00.0Z

只要您了解对execution_date的期望,并且不会尝试根据datetime.now()花费时间,则DAG可以在操作中发挥作用.可以在任何PythonOperator或自定义运算符中随意创建一个新变量,例如my_execution_date = execution_date + datetime.timedelta(7)(您可以从任务的上下文中获取execute_date),可以使用模板语句,例如{{ (execution_date + macros.timedelta(7)).strftime('%Y%m%d') }}{{ macros.ds_add(ds, 7) }}或使用next_execution_date.

So long as you understand what to expect from the execution_date and you don't try to base your time off of datetime.now() your DAGs will be able to be idempotent in operation. Feel free to make a new variable like my_execution_date = execution_date + datetime.timedelta(7) within any PythonOperator or custom operator (you get execution_date from the context of the task), use template statements like {{ (execution_date + macros.timedelta(7)).strftime('%Y%m%d') }} or {{ macros.ds_add(ds, 7) }}, or use the next_execution_date.

您甚至可以添加user_defined_macros这样的dag级别来启用{{ dt(execution_date) }}.最近添加了user_defined_filters,就像{'dt':lambda d: d+datetime.timedelta(days=7)}启用{{ execution_date | dt }}一样. next_dsnext_execution_date会更容易实现您的目的.

You can even add a dag level user_defined_macros like {'dt':lambda d: d+datetime.timedelta(days=7)} to enable {{ dt(execution_date) }}. And recently user_defined_filters were added like {'dt':lambda d: d+datetime.timedelta(days=7)} enabling {{ execution_date | dt }}. The next_ds and next_execution_date would be easier for your purposes.

在考虑模板时,您还可以阅读其中的内置内容: http://jinja.pocoo.org/docs/2.10/templates/#builtin-filters

While thinking about templating, you may as well read up on the built-in stuff out there: http://jinja.pocoo.org/docs/2.10/templates/#builtin-filters

这篇关于气流:执行日期为执行日期=触发日期=固定时间表的dag运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆