有没有办法为同一 DAG 中的任务配置不同的“重试" [英] Is there a way to configure different 'retries' for tasks in the same DAG

查看:15
本文介绍了有没有办法为同一 DAG 中的任务配置不同的“重试"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含许多子任务的 DAG.在 DAG 的中间,有一个验证任务,根据任务的结果/返回代码,我想采用两条不同的路径.如果成功,将遵循一条路线(一系列任务),如果失败,我们将执行一组不同的任务.当前方法有两个问题,一是如果退出代码为 1,验证任务将执行多次(根据配置的重试次数).二是无法采取不同的执行分支

为了解决问题 1,我们可以使用任务实例中可用的重试次数,它可以通过宏 {{ task_instance }} 获得.感谢有人能给我们指出更简洁的方法,而采取不同路径的问题 2 仍未解决.

解决方案

您可以在任务级别进行重试.

run_this = BashOperator(task_id='run_after_loop',bash_command='echo 1',重试=3,达格=达格,)run_this_last = DummyOperator(task_id='run_this_last',重试=1,达格=达格,)

关于您的第二个问题,有一个

<块引用>

BranchPythonOperator 很像 PythonOperator 除了它需要一个 python_callable 返回一个 task_id(或 task_id 列表).遵循返回的 task_id,并跳过所有其他路径.Python 函数返回的 task_id 必须直接引用 BranchPythonOperator 任务下游的任务.

示例 DAG:

随机导入进口气流从airflow.models导入DAG从airflow.operators.dummy_operator 导入DummyOperator从airflow.operators.python_operator 导入BranchPythonOperator参数 = {'所有者':'气流','开始日期':airflow.utils.dates.days_ago(2),}DAG = DAG(dag_id='example_branch_operator',default_args=args,schedule_interval="@daily",)run_this_first = DummyOperator(task_id='run_this_first',达格=达格,)选项 = ['branch_a', 'branch_b', 'branch_c', 'branch_d']分支 = BranchPythonOperator(task_id='分支',python_callable=lambda: random.choice(options),达格=达格,)run_this_first >>分枝加入 = DummyOperator(task_id='加入',trigger_rule='one_success',达格=达格,)对于选项中的选项:t = DummyOperator(task_id=选项,达格=达格,)dummy_follow = DummyOperator(task_id='follow_' + 选项,达格=达格,)分支>>>>dummy_follow >>加入

I have a DAG with many sub-tasks in it. In the middle of the DAG, there is a validation task and based on the result/return code from the task, i want to take two different paths. If success, one route(a sequence of tasks) will be followed and in case of failure, we would like to execute a different set of tasks. There are two problems with the current approach, one is that, validation tasks execute many times(as per the retries configured) if the exit code is 1. Second there is no way possible to take different branches of execution

To solve problem number 1, we can use the retry number is available from the task instance, which is available via the macro {{ task_instance }} . Appreciate if someone could point us to a cleaner approach, and the problem number 2 of taking different paths remains unsolved.

解决方案

You can have retries at the task level.

run_this = BashOperator(
    task_id='run_after_loop',
    bash_command='echo 1',
    retries=3,
    dag=dag,
)

run_this_last = DummyOperator(
    task_id='run_this_last',
    retries=1,
    dag=dag,
)

Regarding your 2nd problem, there is a concept of Branching.

The BranchPythonOperator is much like the PythonOperator except that it expects a python_callable that returns a task_id (or list of task_ids). The task_id returned is followed, and all of the other paths are skipped. The task_id returned by the Python function has to be referencing a task directly downstream from the BranchPythonOperator task.

Example DAG:

import random

import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}

dag = DAG(
    dag_id='example_branch_operator',
    default_args=args,
    schedule_interval="@daily",
)

run_this_first = DummyOperator(
    task_id='run_this_first',
    dag=dag,
)

options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']

branching = BranchPythonOperator(
    task_id='branching',
    python_callable=lambda: random.choice(options),
    dag=dag,
)
run_this_first >> branching

join = DummyOperator(
    task_id='join',
    trigger_rule='one_success',
    dag=dag,
)

for option in options:
    t = DummyOperator(
        task_id=option,
        dag=dag,
    )

    dummy_follow = DummyOperator(
        task_id='follow_' + option,
        dag=dag,
    )

    branching >> t >> dummy_follow >> join

这篇关于有没有办法为同一 DAG 中的任务配置不同的“重试"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆