如何在循环中实现气流 DAG [英] how to implement airflow DAG in a loop
问题描述
我刚开始使用 Airflow.我想在一个循环中设置一个 DAG,当上一个 DAG 完成时,下一个 DAG 开始.这是我想要实现的工作流程:
I just started with Airflow. I want to set up a DAG in a loop, where the next DAG starts when the previous DAG is completed. Here is the work flow that I want to achieve:
list_of_files = [......]
for file in list_of_files:
dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
t1 = BashOperator('copy_this_file', ....)
t2 = BashOperator('process_this_file', ...)
t1.set_downstream(t2)
如果我运行 airflow backfill pipeline -s 2019-05-01
,所有 DAG 都会同时启动.
If I run airflow backfill pipeline -s 2019-05-01
, all the DAGs are started simultaneously.
推荐答案
DAG 不能相互依赖,它们是独立的工作流.您希望将任务配置为相互依赖.您可以有一个带有多个执行分支的 DAG,每个文件一个,类似这样(未测试):
DAGs can't depend on each other, they are separate workflows. You want to configure tasks to depend on each other instead. You can have a single DAG with multiple execution branches, one for each file, something like this (not tested):
dag = DAG('pipeline', ...)
list_of_files = [......]
with dag:
for file in list_of_files:
t1 = BashOperator('copy_this_file', ....)
t2 = BashOperator('process_this_file', ...)
t1.set_downstream(t2)
这篇关于如何在循环中实现气流 DAG的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!