基于外部文件的气流动态任务 [英] Dynamic tasks in airflow based on an external file

查看:20
本文介绍了基于外部文件的气流动态任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从外部文件中读取元素列表并遍历元素以创建一系列任务.

I am reading list of elements from an external file and looping over elements to create a series of tasks.

例如,如果文件中有 2 个元素 - [A, B].将有 2 个系列的任务:

For example, if there are 2 elements in the file - [A, B]. There will be 2 series of tasks:

A1 -> A2 ..
B1 -> B2 ...

这个阅读元素逻辑不是任何任务的一部分,而是在 DAG 本身中.因此,调度程序每天在读取 DAG 文件时会多次调用它.我只想在 DAG 运行时调用它.

This reading elements logic is not part of any task but in the DAG itself. Thus Scheduler is calling it many times a day while reading the DAG file. I want to call it only during DAG runtime.

想知道是否已经有这种用例的模式?

Wondering if there is already a pattern for such kind of use cases?

推荐答案

根据您的要求,如果您正在寻找的是避免多次读取文件,但您不介意从元数据数据库中读取多少次相反,您可以更改您的方法,使用 Variables 作为迭代源来动态创建任务.

Depending on your requirements, if what you are looking for is to avoid reading a file many times, but you don't mind reading from the metadata database as many times instead, then you could change your approach to use Variables as the source of iteration to dynamically create tasks.

一个基本的例子可能是在 PythonOperator 中执行文件读取并设置 Variables 您将用于稍后迭代(相同的可调用):

A basic example could be performing the file reading inside a PythonOperator and set the Variables you will use to iterate later on (same callable):

sample_file.json:

sample_file.json:

{
    "cities": [ "London", "Paris", "BA", "NY" ]
}

任务定义:


def _read_file():
    with open('dags/sample_file.json') as f:
        data = json.load(f)
        Variable.set(key='list_of_cities',
                     value=data['cities'], serialize_json=True)
        print('Loading Variable from file...')


with DAG('dynamic_tasks_from_var', schedule_interval='@once',
         start_date=days_ago(2),
         catchup=False) as dag:

    read_file = PythonOperator(
        task_id='read_file',
        python_callable=_read_file
    )

然后您可以读取该变量并创建动态任务.(设置 default_var 很重要).TaskGroup 是可选的.

Then you could read from that variable and create the dynamic tasks. (It's important to set a default_var). The TaskGroup is optional.

    # Top-level code
    updated_list = Variable.get('list_of_cities',
                                default_var=['default_city'],
                                deserialize_json=True)
    print(f'Updated LIST: {updated_list}')

    with TaskGroup('dynamic_tasks_group',
                   prefix_group_id=False,
                   ) as dynamic_tasks_group:

        for index, city in enumerate(updated_list):
            say_hello = PythonOperator(
                task_id=f'say_hello_from_{city}',
                python_callable=_say_hello,
                op_kwargs={'city_name': city}
            )

# DAG level dependencies
read_file >> dynamic_tasks_group

调度程序日志中,您只会发现:

In the Scheduler logs, you will only find:

INFO - Updated LIST: ['London', 'Paris', 'BA', 'NY']

Dag 图表视图:

使用这种方法,顶级代码(因此由 Scheduler 连续读取)是对 Variable.get() 方法的调用.如果您需要读取多个变量,请务必记住,建议将它们存储在一个单一的 JSON 值中,以避免不断创建与元数据数据库的连接(例如 文章).

With this approach, the top-level code, hence read by the Scheduler continuously, is the call to Variable.get() method. If you need to read from many variables, it's important to remember that it's recommended to store them in one single JSON value to avoid constantly create connections to the metadata database (example in this article).

这篇关于基于外部文件的气流动态任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆