如果下游任务在气流中失败,如何重新运行上游任务(使用Sub Dags) [英] How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)
问题描述
我有一个气流数据采集器,可以提取数据并执行验证。如果验证失败,则需要重新运行提取。如果验证成功,它将继续。
I have an airflow dag that extracts data and performs validation. If the validation fails, it needs to re-run the extract. If the validation succeeds its continues.
我读过一些人说,潜逃可以解决这个问题,但我看不到任何这样的例子。我已经尝试过使用sub dag,但是遇到了与尝试在一个DAG中进行操作相同的问题。
I've read people saying that sub dags can solve this problem, but I can't see any example of this. I've tried using a sub dag, but come across the same problem as trying to do it in one DAG.
如何将Sub DAG中的所有任务如果其中之一失败,是否重新运行?
How can I get all tasks in the Sub DAG to re-run if one of them fails?
我有以下DAG / sub dag详细信息:
I have the following DAG/sub dag details:
maindag .py
maindag.py
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': start_date,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'sla': timedelta(hours=sla_hours)
}
main_dag = DAG(
dag_id,
default_args=default_args,
schedule_interval='30 14 * * *',
max_active_runs=1,
concurrency=1)
task1 = BashOperator(...)
task2 = SubDagOperator(
task_id=sub_dag_task_id,
subdag=sub_dag(dag_id, sub_dag_task_id, start_date, main_dag.schedule_interval),
dag=main_dag)
task3 = BashOperator(...)
subdag.py
subdag.py
def sub_dag(parent_dag_name, task_id, start_date, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, task_id),
schedule_interval=schedule_interval,
start_date=start_date,
)
task1 = BashOperator(...)
task2 = BashOperator(...)
task3 = BashOperator(...)
task1 >> task2 >> task3
return dag
在子dag中,如果任务3失败,我希望任务1即使成功也可以再次运行。为什么这么难做?!!
In the sub dag, if task 3 fails, I want task 1 to run again even though it has succeeded. Why is this so hard to do??!
推荐答案
我已经找到解决方案,方法是创建一个重试回调方法在主要dag中:
I've found a solution to this by creating a retry on callback method in main dag:
(原始来源: https: //gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117
)
(original source: https://gist.github.com/nathairtras/6ce0b0294be8c27d672e2ad52e8f2117 )
from airflow.models import DagBag
def callback_subdag_clear(context):
"""Clears a subdag's tasks on retry."""
dag_id = "{}.{}".format(
context['dag'].dag_id,
context['ti'].task_id
)
execution_date = context['execution_date']
sdag = DagBag().get_dag(dag_id)
sdag.clear(
start_date=execution_date,
end_date=execution_date,
only_failed=False,
only_running=False,
confirm_prompt=False,
include_subdags=False)
Th zh_cn对于运行subdagoperator的任务,它具有:
Then for my task that runs subdagoperator, it has:
on_retry_callback=callback_subdag_clear,
现在,它将清除每个任务的任务实例历史记录,并重新运行子dag中的每个任务,直到主dag中的重试次数为止。
It now clears out the task instance history of each task and re-runs each task in the sub dag up to the number of retries in the main dag.
这篇关于如果下游任务在气流中失败,如何重新运行上游任务(使用Sub Dags)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!