Airflow 将任务实例状态设置为以编程方式跳过 [英] Airflow set task instance status as skipped programmatically

查看:44
本文介绍了Airflow 将任务实例状态设置为以编程方式跳过的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有循环创建任务的列表.就大小而言,该列表是静态的.

 for counter, account_id in enumerate(ACCOUNT_LIST):task_id = fbash_task_{counter}";如果 account_id:trigger_task = BashOperator(task_id=task_id,bash_command="echo hello there",dag=dag)别的:trigger_task = BashOperator(task_id=task_id,bash_command="echo hello there",dag=dag)trigger_task.status = SKIPPED # 有没有办法以某种方式将 this 的状态设置为跳过而不是使用分支运算符?触发任务

我手动尝试过,但无法跳过任务:

 start = DummyOperator(task_id='start')task1 = DummyOperator(task_id='task_1')task2 = DummyOperator(task_id='task_2')task3 = DummyOperator(task_id='task_3')task4 = DummyOperator(task_id='task_4')开始 >>任务1开始 >>任务 2尝试:开始 >>任务 3引发 AirflowSkipException除了 AirflowSkipException 作为 ase:log.error('Task 跳过了 task3')尝试:开始 >>任务 4引发 AirflowSkipException除了 AirflowSkipException 作为 ase:log.error('task 4 的任务被跳过')

解决方案

是的,你需要raise AirflowSkipException

fromairflow.exceptions import AirflowSkipException引发 AirflowSkipException

有关更多信息,请参阅源代码

I have list that I loop to create the tasks. The list are static as far as size.

        for counter, account_id in enumerate(ACCOUNT_LIST):
            task_id = f"bash_task_{counter}"
            if account_id:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
            else:
                trigger_task = BashOperator(
                    task_id=task_id,
                    bash_command="echo hello there",
                    dag=dag)
                trigger_task.status = SKIPPED # is there way to somehow set status of this to skipped instead of having a branch operator?
            trigger_task

I tried this manually but cannot make the task skipped:

        start = DummyOperator(task_id='start')
        task1 = DummyOperator(task_id='task_1')
        task2 = DummyOperator(task_id='task_2')
        task3 = DummyOperator(task_id='task_3')
        task4 = DummyOperator(task_id='task_4')

        start >> task1
        start >> task2

        try:
            start >> task3
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task3')
            
        try:
            start >> task4
            raise AirflowSkipException
        except AirflowSkipException as ase:
            log.error('Task Skipped for task4')

解决方案

yes there you need to raise AirflowSkipException

from airflow.exceptions import AirflowSkipException

raise AirflowSkipException

For more information see the source code

这篇关于Airflow 将任务实例状态设置为以编程方式跳过的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆