气流:为每个文件运行DAG的正确方法 [英] Airflow: Proper way to run DAG for each file

查看:226
本文介绍了气流:为每个文件运行DAG的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下任务要解决:


文件在不规则的时间通过端点发送并存储在本地。我需要为每个文件触发DAG运行。对于每个文件,将执行相同的任务

Files are being sent at irregular times through an endpoint and stored locally. I need to trigger a DAG run for each of these files. For each file the same tasks will be performed

总体流程如下:对于每个文件,运行任务A-> B- > C-> D

Overall the flows looks as follows: For each file, run tasks A->B->C->D

文件正在批量处理。尽管这项任务对我来说似乎微不足道,但是我发现了几种方法可以做到这一点,但我对哪个是适当的(如果有)感到困惑。

Files are being processed in batch. While this task seemed trivial to me, I have found several ways to do this and I am confused about which one is the "proper" one (if any).

也就是说,公开一个Web服务,该服务提取请求和文件,将其存储到文件夹中,并使用< a href = https://airflow.apache.org/docs/stable/api.html rel = nofollow noreferrer>实验性REST api 通过将file_id作为conf传递来触发DAG

That is, expose a web service which ingests the request and the file, stores it to a folder, and uses the experimental REST api to trigger the DAG, by passing the file_id as conf

缺点:REST api仍是实验性,不确定在一个时刻有许多请求时,Airflow如何处理负载测试(这不应该发生,但是,如果会发生呢?)

Cons: REST apis are still experimental, not sure how Airflow can handle a load test with many requests coming at one point (which shouldn't happen, but, what if it does?)

始终使用与前面所述相同的ws,但这一次它只是存储文件。然后我们有:

Always using the same ws as described before, but this time it justs stores the file. Then we have:


  • 第一个dag:使用FileSensor和TriggerDagOperator来触发给定N个文件的N个dag

  • 第二项:任务A-> B-> C

缺点:需要避免将相同的文件发送到两个不同的DAG运行。
示例:

Cons: Need to avoid that the same files are being sent to two different DAG runs. Example:

x.json
文件夹中的文件传感器找到x,触发DAG(1)

Files in folder x.json Sensor finds x, triggers DAG (1)

传感器返回并重新安排。如果DAG(1)没有处理/移动文件,则传感器DAG可能会重新计划使用相同文件运行的新DAG。

Sensor goes back and scheduled again. If DAG (1) did not process/move the file, the sensor DAG might reschedule a new DAG run with the same file. Which is unwanted.

此问题所示。

缺点:这可以工作,但是我不喜欢的是UI可能会混乱,因为每次DAG运行看起来都不一样,但是会随着正在处理的文件数。另外,如果要处理1000个文件,则运行可能很困难

Cons: This could work, however what I dislike is that the UI will probably get messed up because every DAG run will not look the same but it will change with the number of files being processed. Also if there are 1000 files to be processed the run would probably be very difficult to read

我还不确定它们如何完全起作用,如我所见不鼓励使用它们(最后),但是应该可以为每个文件生成一个subdag并使其运行。类似于此问题

I am not yet sure how they completely work as I have seen they are not encouraged (at the end), however it should be possible to spawn a subdag for each file and have it running. Similar to this question.

缺点:似乎subdags只能与顺序执行器一起使用。

Cons: Seems like subdags can only be used with the sequential executor.

我是否遗漏了一些东西而对我本应直截了当的想法进行了过度思考?谢谢

Am I missing something and over-thinking something that should be (in my mind) quite straight-forward? Thanks

推荐答案

我发现了这篇文章: https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13

使用新的运算符,即TriggerMultiDagRunOperator。我认为这很适合我的需求。

where a new operator, namely TriggerMultiDagRunOperator is used. I think this suits my needs.

这篇关于气流:为每个文件运行DAG的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆