气流在首次打开时启动两次DAG运行 [英] Airflow starts two DAG runs when turned on for the first time
问题描述
当我在10月25日17:23左右首次启动Airflow Web服务器和调度程序并打开DAG时,我可以看到它在10月23日和10月24日开始了两次运行:
RUN 1-> 10-23T17:23
RUN 2-> 10-24T17:23
这是我的DAG配置:
default_args = {
'所有者':'气流',
'depends_on_past':否,
'起始日期':'2019-01 -01',
'重试':0,
}
dag = DAG(
'my_script',
default_args = default_args,
schedule_interval = datetime .timedelta(days = 1),
catchup = False,
)
由于已经过去了 start_date + schedule_interval
,并且我已经设置了 catchup = False
,所以我希望它会启动一个DAG立即运行,但是我不希望它运行两次。
- 为什么要执行两次DAG运行?
- 如何防止这种行为?
我不确定,但是这是我最好的猜测-
简而言之,可能是气流的建立方式,解决方法是修改您的开始日期
t
TL; DR
我同意当您开始10到24天的1 dag打开听起来更自然。
但是,根据您的数据,运行1是10-23。
这向我表明,第一次运行的初始化是不正确的,我已经研究了调度程序代码。
我对此行有疑问。
这是内部
##逻辑是我们将start_date向上移动到
#一个周期之前,因此timezone.utcnow()在
#期末之后,可以创建作业...
现在= timezone.utcnow()
#返回当前时间+ schedule_interval。在您的示例中,这将是明天。
next_start = dag.following_schedule(now)
#返回当前时间-schedule_interval。在您的示例中,这将是昨天。
last_start = dag.previous_schedule(now)
#明天< =今天应该返回False
如果next_start< =现在:
new_start = last_start
else:
#,这将返回last_start-schedule_interval,表示2天前。
#想知道这是否是dag.previous_schedule(next_start)???
new_start = dag.previous_schedule(last_start)
When I boot up the Airflow webserver and scheduler for the first time on Oct 25th at around 17:23, and turn on my DAG, I can see that it kicks off two runs for Oct 23rd and Oct 24th:
RUN 1 -> 10-23T17:23
RUN 2 -> 10-24T17:23
Here's my DAG configuration:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': '2019-01-01',
'retries': 0,
}
dag = DAG(
'my_script',
default_args=default_args,
schedule_interval=datetime.timedelta(days=1),
catchup=False,
)
Since it's past the start_date + schedule_interval
and I have set catchup=False
, I would expect it to kick off a single DAG run immediately, however I would not expect it to run two.
- Why are two DAG runs being executed?
- How can I prevent this behaviour?
I am not sure but this is my best guess -
In short answer, could be it is how airflow is built and workaround would be to modify your start_date
to be yesterday.
TL;DR
I agree that kicks off 1 dag for 10-24 when you turned on would sound more natural.
However, according to your dag runs, RUN 1 is 10-23. This suggests to me that initializing of the first run is not correct and I have looked into the scheduler code.
And I have a doubt on this line.
This is inside a function that create a dag run and setting the start date of the run.
# The logic is that we move start_date up until
# one period before, so that timezone.utcnow() is AFTER
# the period end, and the job can be created...
now = timezone.utcnow()
# This returns current time + schedule_interval. In your example, this will be tomorrow.
next_start = dag.following_schedule(now)
# This returns current time - schedule_interval. In your example, this will be yesterday.
last_start = dag.previous_schedule(now)
# tomorrow <= today should return False
if next_start <= now:
new_start = last_start
else:
# and this will return last_start - schedule_interval which means 2 days ago.
# wondering if this is intended to be dag.previous_schedule(next_start)???
new_start = dag.previous_schedule(last_start)
这篇关于气流在首次打开时启动两次DAG运行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!