如何在循环中实现气流 DAG [英] how to implement airflow DAG in a loop

查看:31
本文介绍了如何在循环中实现气流 DAG的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚开始使用 Airflow.我想在一个循环中设置一个 DAG,当上一个 DAG 完成时,下一个 DAG 开始.这是我想要实现的工作流程:

I just started with Airflow. I want to set up a DAG in a loop, where the next DAG starts when the previous DAG is completed. Here is the work flow that I want to achieve:

list_of_files = [......]
for file in list_of_files:
   dag = DAG('pipeline', default_args=default_args, schedule_interval=None)
   t1 = BashOperator('copy_this_file', ....)
   t2 = BashOperator('process_this_file', ...)
   t1.set_downstream(t2)

如果我运行 airflow backfill pipeline -s 2019-05-01,所有 DAG 都会同时启动.

If I run airflow backfill pipeline -s 2019-05-01, all the DAGs are started simultaneously.

推荐答案

DAG 不能相互依赖,它们是独立的工作流.您希望将任务配置为相互依赖.您可以有一个带有多个执行分支的 DAG,每个文件一个,类似这样(未测试):

DAGs can't depend on each other, they are separate workflows. You want to configure tasks to depend on each other instead. You can have a single DAG with multiple execution branches, one for each file, something like this (not tested):

dag = DAG('pipeline', ...)
list_of_files = [......]
with dag:
    for file in list_of_files:
       t1 = BashOperator('copy_this_file', ....)
       t2 = BashOperator('process_this_file', ...)
       t1.set_downstream(t2)

这篇关于如何在循环中实现气流 DAG的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆