Airflow dags 生命周期事件 [英] Airflow dags lifecycle events

查看:48
本文介绍了Airflow dags 生命周期事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过 java 后端管理气流 dag(创建、执行等).目前,在创建 dag 并将其放置在气流的 dags 文件夹中后,我的后端一直在尝试运行 dag.但是直到它被气流调度程序接收到它才能运行它,如果 dag 的数量更多,这可能需要相当长的时间.我想知道是否有气流发出的任何事件,我可以点击这些事件来检查调度程序处理的新 dag,然后触发,从我的后端执行命令.或者有没有一种方法或配置让气流在处理它后会自动启动 dag 而不是我们触发它?

I am trying to manage airflow dags (create, execute etc.) via java backend. Currently after creating a dag and placing it in dags folder of airflow my backend is constantly trying to run the dag. But it can't run it until its picked up by airflow scheduler, which can take quite some time if the number of dags are more. I am wondering if there any events that airflow emits which I can tap to check for new dags processed by scheduler, and then trigger, execute command from my backend. Or is there a way or configuration where airflow will automatically start a dag once it processes it rather than we triggering it ?

推荐答案

有没有一种方法或配置,让气流在处理它后会自动启动一个 dag,而不是我们触发它?

is there a way or configuration where airflow will automatically start a dag once it processes it rather than we triggering it ?

是的,您可以定义的参数之一是 is_paused_upon_creation.

Yes, one of the parameters that you can define is is_paused_upon_creation.

如果您将 DAG 设置为:

If you set your DAG as:

DAG(
    dag_id='tutorial',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval="@daily",
    start_date=datetime(2020, 12, 28),
    is_paused_upon_creation=False
)

DAG 将在调度程序接收到后立即启动(假设满足运行它的条件)

The DAG will start as soon as picked up by the scheduler (assuming conditions to run it are met)

我想知道是否有气流发出的任何事件,我可以点击这些事件来检查调度程序处理的新 dag

I am wondering if there any events that airflow emits which I can tap to check for new dags processed by scheduler

在 Airflow >=2.0.0 中,您可以使用 API - 列出dags端点以获取dagbag中的所有dag

In Airflow >=2.0.0 you can use the API - list dags endpoint to get all dags that are in the dagbag

在任何 Airflow 版本中,您都可以使用此代码列出 dag_id:

In any Airflow version you can use this code to list the dag_ids:

from airflow.models import DagBag
print(DagBag().dag_ids())

这篇关于Airflow dags 生命周期事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆