气流成功后多次重新运行单个任务 [英] Airflow rerun a single task multiple times on success

查看:47
本文介绍了气流成功后多次重新运行单个任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

按顺序重新运行任务 (A) 3 次的最佳方法是什么?:

What is the best way to rerun a task (A) 3 times sequentially?:

那是任务A ->任务 A ->任务 A ->任务B

That is task A -> task A -> task A -> task B

我之所以这么问是因为我将运行另一个单独的数据验证任务 (B),该任务将比较这 3 次单独运行的数据.

I ask because I will run another separate data validation task (B) that will compare the data from those 3 separate runs.

这就是我到目前为止所做的:

So this is what I have done so far:

dag = DAG("hello_world_0", description="Starting tutorial", schedule_interval='* * * * *',
          start_date=datetime(2019, 1, 1),
          catchup=False)

data_pull_1 = BashOperator(task_id='attempt_1', bash_command='echo "Hello World - 1!"',dag=dag)
data_pull_2 = BashOperator(task_id='attempt_2', bash_command='echo "Hello World - 2!"',dag=dag)
data_pull_3 = BashOperator(task_id='attempt_3', bash_command='echo "Hello World - 3!"',dag=dag)

data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"',dag=dag)


data_pull_1 >> data_pull_2 >> data_pull_3 >> data_validation

这可能有效,但有没有更优雅的方法?

This might work but is there a more elegant way?

推荐答案

你可以试试下面的实现,我们使用for循环创建了3个操作

You can try below implementation, we create 3 operations by using for loop

from datetime import datetime

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    "hello_world_0",
    description="Starting tutorial",
    schedule_interval=None,
    start_date=datetime(2019, 1, 1),
    catchup=False
)

chain_operators = []
max_attempt = 3
for attempt in range(max_attempt):
    data_pull = BashOperator(
        task_id='attempt_{}'.format(attempt),
        bash_command='echo "Hello World - {}!"'.format(attempt),
        dag=dag
    )
    chain_operators.append(data_pull)

data_validation = BashOperator(task_id='data_validation', bash_command='echo "Data Validation!"', dag=dag)
chain_operators.append(data_validation)

# Add downstream
for i,val in enumerate(chain_operators[:-1]):
    val.set_downstream(chain_operators[i+1])

我将 schedule_interval 改为 None,因为使用 '* * * * * *' 作业会被连续触发

I changed schedule_interval to None, because with '* * * * *' job will be triggered continuously

这篇关于气流成功后多次重新运行单个任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆