使用 XCOM 值在 Airflow 中创建动态工作流 [英] Create dynamic workflows in Airflow with XCOM value

查看:26
本文介绍了使用 XCOM 值在 Airflow 中创建动态工作流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

现在,我使用这样的变量创建了多个任务,并且工作正常.

 使用 DAG(....) 作为 dag:body = Variable.get("config_table", deserialize_json=True)对于范围内的 i(len(body.keys())):simple_task = 操作员(task_id = 'task_' + str(i),.....

但出于某种原因我需要使用 XCOM 值而不是使用变量.是否可以使用 XCOM 拉取值动态创建任务?

我尝试像这样设置值但它不起作用

body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"

解决方案

可以从上一个任务生成的 XComs 动态创建任务,关于这个话题有更广泛的讨论,例如这个

进口:

导入json从气流导入 DAG从airflow.utils.dates导入days_ago从airflow.models 导入变量从airflow.operators.python_operator 导入PythonOperator从airflow.operators.dummy 导入DummyOperator从airflow.utils.task_group 导入TaskGroup

注意事项:

  • 如果您同时拥有同一 DAG 的 dag_runs,则它们都将使用相同的变量,因此您可能需要通过区分它们的名称来使其唯一".
  • 读取Variable时必须设置默认值,否则第一次执行可能无法被Scheduler处理.
  • Airflow 图形视图 UI 可能不会立即刷新更改.在创建动态任务生成的可迭代对象中添加或删除项目后的第一次运行时尤其如此.
  • 如果您需要从多个变量中读取数据,请务必记住,建议将它们存储在一个 JSON 值中,以避免不断地创建与元数据数据库的连接(例如 文章).

祝你好运!

另一个需要考虑的要点:

  • 使用这种方法,对 Variable.get() 方法的调用是 顶级代码,因此由调度程序每 30 秒读取一次(min_file_process_interval 设置的默认值).这意味着每次都会连接到元数据数据库.

  • 添加 if 子句来处理 emtpy iterable_list 案例.

Now, I create multiple tasks using a variable like this and it works fine.

with DAG(....) as dag:
    body = Variable.get("config_table", deserialize_json=True)
    for i in range(len(body.keys())):
        simple_task = Operator(
            task_id = 'task_' + str(i),
            .....

But I need to use XCOM value for some reason instead of using a variable. Is it possible to dynamically create tasks with XCOM pull value?

I try to set value like this and it's not working

body = "{{ ti.xcom_pull(key='config_table', task_ids='get_config_table') }}"

解决方案

It's possible to dynamically create tasks from XComs generated from a previous task, there are more extensive discussions on this topic, for example in this question. One of the suggested approaches follows this structure, here is a working example I made:

sample_file.json:

{
    "cities": [ "London", "Paris", "BA", "NY" ]
}

  • Get your data from an API or file or any source. Push it as XCom.


def _process_obtained_data(ti):
    list_of_cities = ti.xcom_pull(task_ids='get_data')
    Variable.set(key='list_of_cities',
                 value=list_of_cities['cities'], serialize_json=True)

def _read_file():
    with open('dags/sample_file.json') as f:
        data = json.load(f)
        # push to XCom using return
        return data


with DAG('dynamic_tasks_example', schedule_interval='@once',
         start_date=days_ago(2),
         catchup=False) as dag:

    get_data = PythonOperator(
        task_id='get_data',
        python_callable=_read_file)

  • Add a second task which will pull from pull from XCom and set a Variable with the data you will use to iterate later on.

    preparation_task = PythonOperator(
        task_id='preparation_task',
        python_callable=_process_obtained_data)

*Of course, if you want you can merge both tasks into one. I prefer not to because usually, I take a subset of the fetched data to create the Variable.

  • Read from that Variable and later iterate on it. It's critical to define default_var.

    end = DummyOperator(
        task_id='end',
        trigger_rule='none_failed')

    # Top-level code within DAG block
    iterable_list = Variable.get('list_of_cities',
                                 default_var=['default_city'],
                                 deserialize_json=True)

  • Declare dynamic tasks and their dependencies within a loop. Make the task_id uniques. TaskGroup is optional, helps you sorting the UI.


    with TaskGroup('dynamic_tasks_group',
                   prefix_group_id=False,
                   ) as dynamic_tasks_group:
        if iterable_list:
            for index, city in enumerate(iterable_list):
                say_hello = PythonOperator(
                    task_id=f'say_hello_from_{city}',
                    python_callable=_print_greeting,
                    op_kwargs={'city_name': city, 'greeting': 'Hello'}
                )
                say_goodbye = PythonOperator(
                    task_id=f'say_goodbye_from_{city}',
                    python_callable=_print_greeting,
                    op_kwargs={'city_name': city, 'greeting': 'Goodbye'}
                )

                # TaskGroup level dependencies
                say_hello >> say_goodbye

# DAG level dependencies
get_data >> preparation_task >> dynamic_tasks_group >> end

DAG Graph View:

Imports:

import json
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup

Things to keep in mind:

  • If you have simultaneous dag_runs of this same DAG, all of them will use the same variable, so you may need to make it 'unique' by differentiating their names.
  • You must set the default value while reading the Variable, otherwise, the first execution may not be processable to the Scheduler.
  • The Airflow Graph View UI may not refresh the changes immediately. Happens especially in the first run after adding or removing items from the iterable on which the dynamic task generation is created.
  • If you need to read from many variables, it's important to remember that it's recommended to store them in one single JSON value to avoid constantly create connections to the metadata database (example in this article).

Good luck!

Edit:

Another important point to take into consideration:

  • With this approach, the call to Variable.get() method is top-level code, so is read by the scheduler every 30 seconds (default of min_file_process_interval setting). This means that a connection to the metadata DB will happen each time.

Edit:

  • Added if clause to handle emtpy iterable_list case.

这篇关于使用 XCOM 值在 Airflow 中创建动态工作流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆