Airflow 将任务实例状态设置为以编程方式跳过 [英] Airflow set task instance status as skipped programmatically
本文介绍了Airflow 将任务实例状态设置为以编程方式跳过的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有循环创建任务的列表.就大小而言,该列表是静态的.
for counter, account_id in enumerate(ACCOUNT_LIST):task_id = fbash_task_{counter}";如果 account_id:trigger_task = BashOperator(task_id=task_id,bash_command="echo hello there",dag=dag)别的:trigger_task = BashOperator(task_id=task_id,bash_command="echo hello there",dag=dag)trigger_task.status = SKIPPED # 有没有办法以某种方式将 this 的状态设置为跳过而不是使用分支运算符?触发任务
我手动尝试过,但无法跳过任务:
start = DummyOperator(task_id='start')task1 = DummyOperator(task_id='task_1')task2 = DummyOperator(task_id='task_2')task3 = DummyOperator(task_id='task_3')task4 = DummyOperator(task_id='task_4')开始 >>任务1开始 >>任务 2尝试:开始 >>任务 3引发 AirflowSkipException除了 AirflowSkipException 作为 ase:log.error('Task 跳过了 task3')尝试:开始 >>任务 4引发 AirflowSkipException除了 AirflowSkipException 作为 ase:log.error('task 4 的任务被跳过')
解决方案
是的,你需要raise AirflowSkipException
fromairflow.exceptions import AirflowSkipException引发 AirflowSkipException
有关更多信息,请参阅源代码>
I have list that I loop to create the tasks. The list are static as far as size.
for counter, account_id in enumerate(ACCOUNT_LIST):
task_id = f"bash_task_{counter}"
if account_id:
trigger_task = BashOperator(
task_id=task_id,
bash_command="echo hello there",
dag=dag)
else:
trigger_task = BashOperator(
task_id=task_id,
bash_command="echo hello there",
dag=dag)
trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
trigger_task
I tried this manually but cannot make the task skipped:
start = DummyOperator(task_id='start')
task1 = DummyOperator(task_id='task_1')
task2 = DummyOperator(task_id='task_2')
task3 = DummyOperator(task_id='task_3')
task4 = DummyOperator(task_id='task_4')
start >> task1
start >> task2
try:
start >> task3
raise AirflowSkipException
except AirflowSkipException as ase:
log.error('Task Skipped for task3')
try:
start >> task4
raise AirflowSkipException
except AirflowSkipException as ase:
log.error('Task Skipped for task4')
解决方案
yes there you need to raise AirflowSkipException
from airflow.exceptions import AirflowSkipException
raise AirflowSkipException
For more information see the source code
这篇关于Airflow 将任务实例状态设置为以编程方式跳过的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文