从今天开始,即从2019年12月18日开始,安排DAG气流每5分钟运行一次 [英] Schedule a DAG in airflow to run for every 5 minutes , starting from today i.e., 2019-12-18

查看:310
本文介绍了从今天开始,即从2019年12月18日开始,安排DAG气流每5分钟运行一次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从今天开始(2019-12-18),我尝试每5分钟运行一次DAG.我将开始日期定义为start_date:dt.datetime(2019, 12, 18, 10, 00, 00)并将计划间隔定义为schedule_interval= '*/5 * * * *'.启动airflow scheduler时,我看不到任何任务在运行.

I am trying to run a DAG for every 5 minutes starting from today(2019-12-18). I defined my start date as start_date:dt.datetime(2019, 12, 18, 10, 00, 00) and schedule interval as schedule_interval= '*/5 * * * *' . When I start the airflow scheduler I don't see any of my tasks running.

但是当我将start_date修改为start_date:dt.datetime(2019, 12, 17, 10, 00, 00)时,即昨天的日期,DAG会像每10秒而不是5分钟一样连续运行.

But when I modify the start_date as start_date:dt.datetime(2019, 12, 17, 10, 00, 00) i.e., Yesterdays date, the DAG runs continuously like for every 10 seconds but not 5 minutes.

我认为解决此问题的方法是正确设置start_date,但是我找不到完美的解决方案.请帮帮我!

I think the solution to this problem is to set the start_date correctly, but I could not find the perfect solution for this. Please help me!

这是我的代码.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
import datetime as dt
from airflow.operators.python_operator import PythonOperator

def print_world():
   print('world')


default_args = {
    'owner': 'bhanuprakash',
    'depends_on_past': False,
    'start_date': dt.datetime(2019, 12, 18, 10, 00, 00),
    'email': ['bhanuprakash.uchula@techwave.net'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': dt.timedelta(minutes=5)
}

with DAG('dag_today',
    default_args=default_args,
    schedule_interval= '*/5 * * * *'
    ) as dag:


    print_hello = BashOperator(task_id='print_hello',
        bash_command='gnome-terminal')


    sleep = BashOperator(task_id='sleep',
        bash_command='sleep 5')


    print_world = PythonOperator(task_id='print_world',
        python_callable=print_world)

print_hello >> sleep >> print_world

推荐答案

您要传递给Airflow的datetime对象不支持时区.气流内部使用UTC.您传递给Airflow的朴素的datetime对象可能与计划程序的时间概念不符,这可能就是为什么DAG并未计划在今天"午夜运行(2019-12-18).

The datetime object you are passing to Airflow isn't timezone aware. Airflow uses UTC internally. The naive datetime object you are passing to Airflow may not be aligned with the scheduler's notion of time and this could be why the DAG isn't being scheduled to run midnight "today" (2019-12-18).

而不是像这样传递朴素的datetime对象:

Instead of passing a naive datetime object like this:

'start_date': dt.datetime(2019, 12, 18, 10, 00, 00)

尝试使用摆锤使您的DAG时区为人所知:

Try using pendulum to make your DAG timezone aware:

import pendulum

...
'start_date': pendulum.datetime(year=2019, month=12, day=10).astimezone('YOUR TIMEZONE'), # See list of tz database time zones here -> https://en.wikipedia.org/wiki/List_of_tz_database_time_zones

文档( https://airflow.apache.org/docs/stable/timezone.html )非常有用,它可以获取有关如何在Airflow中处理日期时间的提示.

The docs (https://airflow.apache.org/docs/stable/timezone.html) are quite useful got getting tips on how to handle datetimes in Airflow.

关于运行频率的其他问题...默认情况下,DAG运行设计为在开始日期和结束日期之间的所有时间间隔进行赶超".要禁用此行为,您将需要在实例化DAG时添加catchup = False.

As for your other question on run frequency ... DAG runs are designed to do "Catchup" on all the intervals between your start and end date by default. To disable this behavior you will need to add catchup=False when instantiating your DAG.

气流文档

回填和追赶

一个带有开始日期(可能是结束日期)和一个开始日期的Airflow DAG schedule_interval定义了调度程序要处理的一系列间隔 变成单独的Dag Run并执行.气流的关键功能 这些DAG运行是原子的,幂等的项,并且 默认情况下,调度程序将检查DAG的生命周期(从 开始/现在,一次间隔一次),然后开始DAG Run 任何尚未运行(或已清除)的间隔.这个概念 称为赶上.

An Airflow DAG with a start_date, possibly an end_date, and a schedule_interval defines a series of intervals which the scheduler turn into individual Dag Runs and execute. A key capability of Airflow is that these DAG Runs are atomic, idempotent items, and the scheduler, by default, will examine the lifetime of the DAG (from start to end/now, one interval at a time) and kick off a DAG Run for any interval that has not been run (or has been cleared). This concept is called Catchup.

如果您编写的DAG可以处理自己的追赶(IE不限于 时间间隔,但改为现在".),那么您将需要 关闭追赶(使用dag.catchup =在DAG本身上 否)或默认情况下,在配置文件级别使用 catchup_by_default = False.这样做是为了指示 调度程序,仅为的最新实例创建DAG运行 DAG间隔系列.

If your DAG is written to handle its own catchup (IE not limited to the interval, but instead to "Now" for instance.), then you will want to turn catchup off (Either on the DAG itself with dag.catchup = False) or by default at the configuration file level with catchup_by_default = False. What this will do, is to instruct the scheduler to only create a DAG Run for the most current instance of the DAG interval series.

我建议仔细阅读我链接的两页内容,以更好地了解基本的气流概念.

I'd suggest going over the two pages I linked to get a better intuition of the basic Airflow concepts.

这篇关于从今天开始,即从2019年12月18日开始,安排DAG气流每5分钟运行一次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆