如何在Airflow中实施轮询? [英] How to implement polling in Airflow?

查看:227
本文介绍了如何在Airflow中实施轮询?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Airflow实施数据流,这些数据流定期轮询外部系统(ftp服务器等),检查是否符合特定条件的新文件,然后为这些文件运行一堆任务。现在,我是Airflow的新手,并且了解到在这种情况下可以使用Sensor,并且实际上我设法编写了一个在运行 airflow test时可以正常工作的传感器。但是对于传感器的poke_interval与DAG调度的关系,我有些困惑。如何为用例定义这些设置?还是应该使用其他方法?我只希望Airflow在这些文件可用时运行任务,而不是在一段时间内没有新文件可用时将失败淹没在仪表板上。

解决方案

您的理解是正确的,当您要轮询时,可以通过使用现有的传感器或实现自己的传感器来使用传感器。



<但是,它们始终是DAG的一部分,并且不会在其边界之外执行。 DAG的执行取决于开始日期 schedule_interval ,但是您可以利用它和传感器来实现某种DAG,具体取决于关于外部服务器的状态:一种可能的方法是使用传感器启动整个DAG,该传感器检查条件是否发生,并在不满足条件的情况下决定跳过整个DAG(您可以确保传感器标记了下游任务通过将其 soft_fail 参数设置为跳过而不会失败 True )。通过使用最频繁的计划选项( * * * * * ),您可以将轮询间隔设为一分钟。如果您真的需要最短的轮询时间,则可以调整传感器的 poke_interval timeout 参数



但是请记住,Airflow本身并不能保证执行时间,因此对于非常短的轮询时间,您可能需要研究替代方法(或至少考虑与我刚刚分享的方法不同)。


I want to use Airflow to implement data flows that periodically poll external systems (ftp servers, etc), check for new files matching certain conditions, and then run a bunch of tasks for those files. Now, I'm a newbie to Airflow and read that Sensors are something you would use for this kind of a case, and I actually managed to write a sensor that works ok when I run "airflow test" for it. But I'm a bit confused regarding the relation of poke_interval for the sensor and the DAG scheduling. How should I define those settings for my use case? Or should I use some other approach? I just want Airflow to run the tasks when those files become available, and not flood the dashboard with failures when no new files were available for a while.

解决方案

Your understanding is correct, using a sensor is the way to go when you want to poll, either by using an existing sensor or by implementing your own.

They are, however, always part of a DAG and they do not execute outside of its boundaries. DAG execution depends on the start_date and schedule_interval, but you can leverage this and a sensor to implement some sort of DAG depending on the status of an external server: one possible approach would be starting the whole DAG with a sensor which checks for a condition to occur and decide to skip the whole DAG if the condition is not met (you can make sure that sensors mark downstream tasks as skipped and not failed by setting their soft_fail parameter to True). You can have a polling interval of one minute by using the most frequent scheduling option (* * * * *). If you really need a shortest polling time you can tweak the sensor's poke_interval and timeout parameters.

Keep in mind, however, that execution times are not probably guaranteed by Airflow itself, so for very short polling times you may want to investigate alternatives (or at least consider different approaches to the one I've just shared).

这篇关于如何在Airflow中实施轮询?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆