分支任务的气流问题 [英] Airflow issue with branching tasks
问题描述
我正在尝试设置一个 DAG,其中每分钟运行一个任务,然后在第 5 分钟(就在 1 分钟任务之前)运行另一个任务.这真的只是测试,我不打算在这么短的时间间隔内运行作业.
I am trying to setup a DAG where a task is run every minute, and then another task is run on the 5th minute (right before the 1 minute task). It's really just testing, I am not planning to run jobs in such short intervals.
从视觉上看,我的 DAG 如下所示:
Visually, my DAG looks like this:
代码本身是这样的:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 9)
}
now = datetime.now()
minute_check = now.minute % 5
dag = DAG(
dag_id='test3',
default_args=default_args,
schedule_interval='* * * * *',
dagrun_timeout=timedelta(minutes=5),
catchup=False,
max_active_runs=99
)
def check_minute():
if minute_check == 0:
return "branch_fiveminute"
else:
return "branch_minute"
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=check_minute,
trigger_rule='all_done',
dag=dag)
branch_minute = BashOperator(
task_id='branch_minute',
bash_command='test1min.sh ',
trigger_rule='all_done',
dag=dag)
branch_fiveminute = BashOperator(
task_id='branch_fiveminute',
bash_command='test5min.sh ',
trigger_rule='all_done',
dag=dag)
branch_task.set_downstream(branch_minute)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)
我遇到的问题是,在第 5 分钟,气流跳过了 1 分钟的任务:
The problem i am getting is, that on the 5th minute, airflow skips the 1 minute task:
我尝试使用 trigger_rule 设置,但没有取得多大成功.
I have tried playing around with the trigger_rule settings without much success.
任何想法有什么问题?如果重要的话,我正在使用 Airflow 1.10.
Any ideas whats wrong? I am using Airflow 1.10 if it matters.
推荐答案
由于您对 5 分钟任务遵循不同的执行路径,因此一分钟任务将被跳过.从图中可以看出有点反直觉,但只有 1 条路径可以执行.
Since you follow a different execution path for the 5 minute task, the one minute task gets skipped. It's a little counter intuitive from the diagram but only 1 path with execute.
所以你要做的就是在开始时有分支,一条路径通向假算子,一条路径通向 5 分钟任务,但是 5 分钟任务和哑算子都将进入1 分钟的任务.
So what you have to do is is have the branch at the beginning, one path leads into a dummy operator for false and one path leads to the 5 minute task, however both the 5 minute task and the dummy operator will lead into the 1 minute task.
这种方式会跳过虚拟任务,但无论选择哪个执行路径,执行流程都会以 1 分钟的任务结束.
This way the dummy task gets skipped but the execution flow ends up in the 1 minute task regardless of which execution path is selected.
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 9)
}
now = datetime.now()
minute_check = now.minute % 5
dag = DAG(
dag_id='test3',
default_args=default_args,
schedule_interval='* * * * *',
dagrun_timeout=timedelta(minutes=5),
catchup=False,
max_active_runs=99
)
def check_minute():
if minute_check == 0:
return "branch_fiveminute"
else:
return "branch_false_1"
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=check_minute,
trigger_rule='all_done',
dag=dag)
branch_minute = BashOperator(
task_id='branch_minute',
bash_command='test1min.sh ',
trigger_rule='all_done',
dag=dag)
branch_fiveminute = BashOperator(
task_id='branch_fiveminute',
bash_command='test5min.sh ',
trigger_rule='all_done',
dag=dag)
branch_false_1 = DummyOperator( task_id= "branch_false_1", dag=dag )
branch_task.set_downstream(branch_false_1)
branch_task.set_downstream(branch_fiveminute)
branch_fiveminute.set_downstream(branch_minute)
branch_false_1.set_downstream(branch_minute)
这篇关于分支任务的气流问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!