气流:Dag 间隔几秒钟安排两次 [英] Airflow: Dag scheduled twice a few seconds apart

查看:26
本文介绍了气流:Dag 间隔几秒钟安排两次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试每天在 00:15:00(午夜 15 分钟)运行一次 DAG,但是,它被安排了两次,相隔几秒钟.

I am trying to run a DAG only once a day at 00:15:00 (midnight 15 minutes), yet, it's being scheduled twice, a few seconds apart.

dag = DAG(
    'my_dag',
    default_args=default_args,
    start_date=airflow.utils.dates.days_ago(1) - timedelta(minutes=10),
    schedule_interval='15 0 * * * *',
    concurrency=1,
    max_active_runs=1,
    retries=3,
    catchup=False,
)

该 Dag 的主要目标是检查新电子邮件,然后检查 SFTP 目录中的新文件,然后运行合并"任务将这些新文件添加到数据库中.

The main goal of that Dag is check for new emails then check for new files in a SFTP directory and then run a "merger" task to add those new files to a database.

所有作业都是 Kubernetes pod:

All the jobs are Kubernetes pods:

email_check = KubernetesPodOperator(
    namespace='default',
    image="g.io/email-check:0d334adb",
    name="email-check",
    task_id="email-check",
    get_logs=True,
    dag=dag,
)
sftp_check = KubernetesPodOperator(
    namespace='default',
    image="g.io/sftp-check:0d334adb",
    name="sftp-check",
    task_id="sftp-check",
    get_logs=True,
    dag=dag,
)
my_runner = KubernetesPodOperator(
    namespace='default',
    image="g.io/my-runner:0d334adb",
    name="my-runner",
    task_id="my-runner",
    get_logs=True,
    dag=dag,
)
my_runner.set_upstream([sftp_check, email_check])

因此,问题是 DAG scheduled 的两次运行似乎相隔几秒钟.它们不会同时运行,但是一旦第一个完成,第二个就会开始.

So, the issue is that there seems to be two runs of the DAG scheduled a few seconds apart. They do not run concurrently, but as soon as the first one is done, the second one kicks off.

这里的问题是 my_runner 作业打算每天只运行一次:它尝试创建一个以日期为后缀的文件,如果该文件已经存在,它会抛出一个异常,所以第二次运行总是抛出异常(因为第一次运行时已经正确创建了当天的文件)

The problem here is that the my_runner job is intended to only run once a day: it tries to create a file with the date as a suffix, and if the file already exists, it throws an exception, so that second run always throws an exception (because the file for the day has already been properly created by the first run)

因为一张(或两张)图片值一千字,所以这里是:

Since an image (or two) are worth a thousand words, here it goes:

您会看到第一次运行被安排为00:15 后 22 秒"(这很好……有时会在这里和那里变化几秒钟)然后是第二个似乎总是在 00:15 UTC 之后 58 秒"(至少根据他们得到的名字).所以第一个运行良好,似乎没有其他东西在运行......一旦它完成运行,第二个运行(计划在 00:15:58) 开始(并失败).

You'll see that there's a first run that is scheduled "22 seconds after 00:15" (that's fine... sometimes it varies a couple of seconds here and there) and then there's a second one that always seems to be scheduled "58 seconds after 00:15 UTC" (at least according to the name they get). So the first one runs fine, nothing else seems to be running... And as soon as it finishes the run, a second run (the one scheduled at 00:15:58) starts (and fails).

一个好"的:

一个坏"的:

推荐答案

看起来将 start_date 设置为 2 days ago 而不是 1 就成功了

It looks like setting the start_date to 2 days ago instead of 1 did the trick

dag = DAG(
    'my_dag',
    ...
    start_date=airflow.utils.dates.days_ago(2),
    ...
)

我不知道为什么.

我只是有一个理论.Maaaaaaybe(可能很大)问题是因为.days_ago(...) 将UTC datetime 设置为0> 然后减去参数中指定的任何天数,只是说一天前"甚至一天零 10 分钟前"都没有把 start_date 在下一个时期 (00:15) 并且不知何故混淆了 Airflow?

I just have a theory. Maaaaaaybe (big maybe) the issue was that because.days_ago(...) sets a UTC datetime with hour/minute/second set to 0 and then subtracts whichever number of days indicated in the argument, just saying "one day ago" or even "one day and 10 minutes ago" didn't put the start_date over the next period (00:15) and that was somehow confusing Airflow?

让我们重复一遍,调度程序运行您的作业一个 schedule_interval开始日期之后,期间结束时.

Let’s Repeat That The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

https://airflow.readthedocs.io/en/stable/scheduler.html#scheduling-triggers

所以,这个时间段的结束时间是 00:15...如果我的理论是正确的,那就去做 airflow.utils.dates.days_ago(1) - timedelta(minutes=16)可能也能用.

So, the end of the period would be 00:15... If my theory was correct, doing it airflow.utils.dates.days_ago(1) - timedelta(minutes=16) would probably also work.

这并不能解释为什么如果我在过去很长时间内设置一个日期,它就不会运行.¯\_(ツ)_/¯

This doesn't explain why if I set a date very far in the past, it just doesn't run, though. ¯\_(ツ)_/¯

这篇关于气流:Dag 间隔几秒钟安排两次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆