为每个文件运行气流 DAG [英] Run airflow DAG for each file

查看:35
本文介绍了为每个文件运行气流 DAG的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我在气流中使用了这个非常好的 DAG,它基本上对二进制文件运行了几个分析步骤(作为气流插件实现).DAG 由 ftp 传感器触发,它只检查 ftp 服务器上是否有新文件,然后启动整个工作流程.

So I have this quite nice DAG in airflow which basically runs several analysis steps (implemented as airflow plugins) on binary files. A DAG is triggert by an ftp sensor which just checks if there is a new file on the ftp server and then starts the whole workflow.

所以目前的工作流程是这样的:DAG 按定义触发 -> 传感器在 ftp 上等待新文件 -> 执行分析步骤 -> 工作流程结束.

So currently the workflow is like this: DAG is triggert as defined -> sensor waits for new file on ftp -> analysis steps are executed -> end of workflow.

我想要的是这样的:DAG 是触发器 -> 传感器等待 ftp 上的新文件 -> 对于 ftp 上的每个文件,分析步骤单独执行 -> 每个工作流程单独结束.

What I'd like to have is something like this: DAG is triggerts -> sensor waits for new file on ftp -> for every file on the ftp the analysis steps are executed individully -> each workflow ends individually.

如何为 ftp 服务器上的每个文件执行分析工作流程,如果服务器上没有文件,只有一个传感器应该等待新文件?例如,我不想每秒启动一个 DAG,因为那样我有很多传感器正在等待新文件.

How do I get the analysis workflow to be executed for each file on the ftp server and if there is no file on the server, just one sensor should wait for a new file? I don't want to e.g., start a DAG every second or so because then I have many sensors just waiting for a new file.

推荐答案

使用 2 个 DAG 将传感步骤与分析步骤分开.

Use 2 DAGs to separate the sensing step from analysis steps.

DAG 1:

传感器在 ftp 上等待新文件 -> 一旦新文件登陆,使用 TriggerDagRunOperator 本身触发 DAG 1 -> 使用 TriggerDagRunOperator 触发 DAG 2

sensor waits for new file on ftp -> once new file lands, use TriggerDagRunOperator to trigger DAG 1 itself -> use TriggerDagRunOperator to trigger DAG 2

DAG 2:

对文件做分析步骤

这篇关于为每个文件运行气流 DAG的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆