Airflow 任务能否在运行时动态生成 DAG? [英] Can an Airflow task dynamically generate a DAG at runtime?

查看:29
本文介绍了Airflow 任务能否在运行时动态生成 DAG?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个不定期上传的上传文件夹.对于每个上传的文件,我想生成一个特定于该文件的 DAG.

I have an upload folder that gets irregular uploads. For each uploaded file, I want to spawn a DAG that is specific to that file.

我的第一个想法是使用 FileSensor 来监控上传文件夹,并根据新文件的存在触发创建单独 DAG 的任务.概念上:

My first thought was to do this with a FileSensor that monitors the upload folder and, conditional on presence of new files, triggers a task that creates the separate DAGs. Conceptually:

Sensor_DAG (FileSensor -> CreateDAGTask)

|-> File1_DAG (Task1 -> Task2 -> ...)
|-> File2_DAG (Task1 -> Task2 -> ...)

在我的初始实现中,CreateDAGTask 是一个 PythonOperator,它通过将 DAG 全局变量放置在全局命名空间 (请参阅此 SO 答案),如下所示:

In my initial implementation, CreateDAGTask was a PythonOperator that created DAG globals, by placing them in the global namespace (see this SO answer), like so:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from datetime import datetime, timedelta
from pathlib import Path

UPLOAD_LOCATION = "/opt/files/uploaded"

# Dynamic DAG generation task code, for the Sensor_DAG below
def generate_dags_for_files(location=UPLOAD_LOCATION, **kwargs):
    dags = []
    for filepath in Path(location).glob('*'):
        dag_name = f"process_{filepath.name}"
        dag = DAG(dag_name, schedule_interval="@once", default_args={
            "depends_on_past": True,
            "start_date": datetime(2020, 7, 15),
            "retries": 1,
            "retry_delay": timedelta(hours=12)
        }, catchup=False)
        dag_task = DummyOperator(dag=dag, task_id=f"start_{dag_name}")

        dags.append(dag)

        # Try to place the DAG into globals(), which doesn't work
        globals()[dag_name] = dag

    return dags

主 DAG 然后通过 PythonOperator 调用此逻辑:

The main DAG then invokes this logic via a PythonOperator:

# File-sensing DAG
default_args = {
    "depends_on_past" : False,
    "start_date"      : datetime(2020, 7, 16),
    "retries"         : 1,
    "retry_delay"     : timedelta(hours=5),
}
with DAG("Sensor_DAG", default_args=default_args,
         schedule_interval= "50 * * * *", catchup=False, ) as sensor_dag:

    start_task  = DummyOperator(task_id="start")
    stop_task   = DummyOperator(task_id="stop")
    sensor_task = FileSensor(task_id="my_file_sensor_task",
                             poke_interval=60,
                             filepath=UPLOAD_LOCATION)
    process_creator_task = PythonOperator(
        task_id="process_creator",
        python_callable=generate_dags_for_files,
    )
    start_task >> sensor_task >> process_creator_task >> stop_task

但这不起作用,因为当 process_creator_task 运行时,Airflow 已经解析了全局变量.解析时间后的新全局变量无关紧要.

But that doesn't work, because by the time process_creator_task runs, the globals have already been parsed by Airflow. New globals after parse time are irrelevant.

根据 Airflow 动态 DAG 和任务 ID我可以通过完全省略 FileSensor 任务并让 Airflow 在每个调度程序心跳时生成每个文件的任务来实现我想要做的事情,仅执行 generate_dags_for_files 替换 Sensor_DAG: 更新:没关系——虽然这确实在仪表板中创建了一个 DAG,但实际执行运行到 DAG 似乎丢失" 问题:

Per Airflow dynamic DAG and task Ids, I can achieve what I'm trying to do by omitting the FileSensor task altogether and just letting Airflow generate the per-file task at each scheduler heartbeat, replacing the Sensor_DAG with just executing generate_dags_for_files: Update: Nevermind -- while this does create a DAG in the dashboard, actual execution runs into the "DAG seems to be missing" issue:

generate_dags_for_files()

这确实意味着我无法再使用 FileSensorpoke_interval 参数来调节文件夹轮询的频率;相反,Airflow 每次收集 DAG 时都会轮询文件夹.

This does mean that I can no longer regulate the frequency of folder polling with the poke_interval parameter of FileSensor; instead, Airflow will poll the folder every time it collects DAGs.

这是最好的模式吗?

  • Run Airflow DAG for each file and Airflow: Proper way to run DAG for each file: identical use case, but the accepted answer uses two static DAGs, presumably with different parameters.
  • Proper way to create dynamic workflows in Airflow - accepted answer dynamically creates tasks, not DAGs, via a complicated XCom setup.

推荐答案

简而言之:如果任务写入 DagBag 读取的位置,是的,但最好避免需要这样做的模式.您想在任务中自定义创建的任何 DAG 都应该是静态的、大量参数化的、条件触发的 DAG. y2k-shubham 提供了这种设置的一个很好的例子,我很感谢他在这个问题的评论中的指导.

In short: if the task writes where the DagBag reads from, yes, but it's best to avoid a pattern that requires this. Any DAG you're tempted to custom-create in a task should probably instead be a static, heavily parametrized, conditionally-triggered DAG. y2k-shubham provides an excellent example of such a setup, and I'm grateful for his guidance in the comments on this question.

也就是说,这里有一些方法可以解决问题,无论想法多么糟糕,而且越来越笨拙:

That said, here are the approaches that would accomplish what the question is asking, no matter how bad of an idea it is, in the increasing degree of ham-handedness:

  • 如果您从变量动态生成 DAG(像这样), 修改变量.
  • 如果您从配置文件列表动态生成 DAG,请将新的配置文件添加到您从中提取配置文件的任何位置,以便在下一个 DAG 集合中生成新的 DAG.
  • 使用类似 Jinja 模板的东西在 dags/ 文件夹中编写一个新的 Python 文件.
  • If you dynamically generate DAGs from a Variable (like so), modify the Variable.
  • If you dynamically generate DAGs from a list of config files, add a new config file to wherever you're pulling config files from, so that a new DAG gets generated on the next DAG collection.
  • Use something like Jinja templating to write a new Python file in the dags/ folder.

要在任务运行后保留对任务的访问权限,您必须保持新的 DAG 定义稳定且可在未来的仪表板更新/DagBag 集合中访问.否则,Airflow 仪表板不会能够呈现很多关于它的内容.

To retain access to the task after it runs, you'd have to keep the new DAG definition stable and accessible on future dashboard updates / DagBag collection. Otherwise, the Airflow dashboard won't be able to render much about it.

这篇关于Airflow 任务能否在运行时动态生成 DAG?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆