如何在Airflow中创建条件任务 [英] How to create a conditional task in Airflow
本文介绍了如何在Airflow中创建条件任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我想在Airflow中创建一个条件任务,如下图所示。预期的情况如下:
I would like to create a conditional task in Airflow as described in the schema below. The expected scenario is the following:
- 任务1执行
- 如果任务1成功,则执行任务2a
- 其他如果任务1失败,则执行任务2b
- 最后执行任务3
- Task 1 executes
- If Task 1 succeed, then execute Task 2a
- Else If Task 1 fails, then execute Task 2b
- Finally execute Task 3
上面所有任务都是SSHExecuteOperator。
我想我应该使用ShortCircuitOperator和/或XCom来管理条件,但是我不清楚如何实现。
All tasks above are SSHExecuteOperator. I'm guessing I should be using the ShortCircuitOperator and / or XCom to manage the condition but I am not clear on how to implement that. Could you please describe the solution?
推荐答案
您必须使用气流触发规则
所有运算符都有一个trigger_rule参数,用于定义规则触发生成的任务。
All operators have a trigger_rule argument which defines the rule by which the generated task get triggered.
触发规则的可能性:
ALL_SUCCESS = 'all_success'
ALL_FAILED = 'all_failed'
ALL_DONE = 'all_done'
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
这是解决您的问题的想法:
Here is the idea to solve your problem:
from airflow.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.contrib.hooks import SSHHook
sshHook = SSHHook(conn_id=<YOUR CONNECTION ID FROM THE UI>)
task_1 = SSHExecuteOperator(
task_id='task_1',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2 = SSHExecuteOperator(
task_id='conditional_task',
bash_command=<YOUR COMMAND>,
ssh_hook=sshHook,
dag=dag)
task_2a = SSHExecuteOperator(
task_id='task_2a',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2b = SSHExecuteOperator(
task_id='task_2b',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ALL_FAILED,
ssh_hook=sshHook,
dag=dag)
task_3 = SSHExecuteOperator(
task_id='task_3',
bash_command=<YOUR COMMAND>,
trigger_rule=TriggerRule.ONE_SUCCESS,
ssh_hook=sshHook,
dag=dag)
task_2.set_upstream(task_1)
task_2a.set_upstream(task_2)
task_2b.set_upstream(task_2)
task_3.set_upstream(task_2a)
task_3.set_upstream(task_2b)
这篇关于如何在Airflow中创建条件任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文