如何跳过Airflow上的任务? [英] How to skip tasks on Airflow?

查看:618
本文介绍了如何跳过Airflow上的任务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图了解Airflow是否支持在DAG中跳过临时执行的任务?



让我说一下我的DAG图如下:
task1> task2> task3> task4



我想从task3手动启动DAG,最好的方法是什么?



我已经阅读了 ShortCircuitOperator ,但是我正在寻找更多的临时解决方案,一旦执行被触发,它们便可以应用。 / p>

谢谢!

解决方案

您可以合并 SkipMixin 表示ShortCircuitOperator 在后台使用来跳过下游任务。

 来自airflow.models从airflow.utils.decorators导入BaseOperator,SkipMixin 
import apply_defaults


类mySkippingOperator(BaseOperator,SkipMixin)

@apply_defaults
def __init __(self,
condition,
* args,
** kwargs):
super().__ init __(* args,** kwargs)
self.condition = condition

def execute(self,context):

if self.condition:
self.log.info('进行下游任务...' )
return

self.log.info('跳过下游任务...')

下游_任务=上下文['任务'] .get_flat_relatives(上游= False)

self.log.debug(下游task_ids%s,下游任务)

如果下游任务:
self.skip(context ['dag_run'] ,context ['ti']。execution_date,下游任务)

self.log.info( Done。)


I'm trying to understand whether Airflow supports skipping tasks in a DAG for ad-hoc executions?

Lets say my DAG graph look like this: task1 > task2 > task3 > task4

And I would like to start my DAG manually from task3, what is the best way of doing that?

I've read about ShortCircuitOperator, but I'm looking for more ad-hoc solution which can apply once the execution is triggered.

Thanks!

解决方案

You can incorporate the SkipMixin that the ShortCircuitOperator uses under the hood to skip downstream tasks.

from airflow.models import BaseOperator, SkipMixin
from airflow.utils.decorators import apply_defaults


class mySkippingOperator(BaseOperator, SkipMixin)

    @apply_defaults
    def __init__(self,
                 condition,
                 *args,
                 **kwargs):
        super().__init__(*args, **kwargs)
        self.condition = condition

    def execute(self, context):

        if self.condition:
           self.log.info('Proceeding with downstream tasks...')
           return

        self.log.info('Skipping downstream tasks...')

        downstream_tasks = context['task'].get_flat_relatives(upstream=False)

        self.log.debug("Downstream task_ids %s", downstream_tasks)

        if downstream_tasks:
            self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)

        self.log.info("Done.")

这篇关于如何跳过Airflow上的任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆