气流,标记任务成功或在运行dag之前跳过它 [英] Airflow, mark a task success or skip it before dag run

查看:133
本文介绍了气流,标记任务成功或在运行dag之前跳过它的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个庞大的DAG,其中包含许多小型和快速任务,以及一些大型和耗时的任务。

We have a huge DAG, with many small and fast tasks and a few big and time consuming tasks.

我们只想运行DAG的一部分,而发现的最简单的方法是不添加不想运行的任务。问题在于我们的DAG具有许多相互依赖关系,因此当我们要跳过某些任务时,要不中断工作就成为了一个真正的挑战。

We want to run just a part of the DAG, and the easiest way that we found is to not add the task that we don't want to run. The problem is that our DAG has many co-dependencies, so it became a real challenge to not broke the dag when we want to skip some tasks.

是否可以通过默认方式为任务添加状态? (每次运行),类似以下内容:

Its there a way to add a status to the task by default? (for every run), something like:

# get the skip list from a env variable    
task_list = models.Variable.get('list_of_tasks_to_skip')

dag.skip(task_list)

for task in task_list:
    task.status = 'success'


推荐答案

如注释中所述,您应该使用 BranchPythonOperator (或 ShortCircuitOperator )以防止执行耗时的任务。如果您需要运行这些耗时任务的下游操作员,则可以使用 TriggerRule.ALL_DONE 来运行这些操作员,但是请注意,即使上游操作员也可以运行

As mentioned in the comments, you should use the BranchPythonOperator (or ShortCircuitOperator) to prevent the time-consuming tasks from executing. If you need downstream operators of these time-consuming tasks to run, you can use the TriggerRule.ALL_DONE to have those operators run, but note this will run even when the upstream operators fail.

您可以使用气流变量来影响这些 BranchPythonOperators ,而不必更新DAG,例如:

You can use Airflow Variables to affect these BranchPythonOperators without having to update the DAG, eg:

from airflow.models import Variable

def branch_python_operator_callable()
  return Variable.get('time_consuming_operator_var')

并使用 branch_python_operator_callable 作为可为您的BranchPythonOperator调用的Python。

and use branch_python_operator_callable as the Python callable for your BranchPythonOperator.

这篇关于气流,标记任务成功或在运行dag之前跳过它的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆