根据气流中的 sql 查询结果创建动态任务 [英] Create dynamic tasks depending on the result of an sql query in airflow

查看:87
本文介绍了根据气流中的 sql 查询结果创建动态任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 TaskGroup 创建动态任务,并将结果保存在变量中.根据数据库查询,该变量每 N 分钟修改一次,但是当第二次修改该变量时,调度程序会崩溃

I am trying to create dynamic tasks with TaskGroup saving the result in a variable. The variable is modified every N minutes depending on a database query but when the variable is modified the second time the scheduler breaks down

基本上我需要根据查询中收到的唯一行数创建任务.

Basically I need to create tasks based on the number of unique rows that is received in the query.

以 TaskGroup(f"task") 作为任务:

with TaskGroup(f"task") as task:

    data_variable = Variable.get("df")
    data = data_variable

    try :
        if data != False and data !='none':
            df = pd.read_json(data)

            for field_id in df.field.unique():
             

                task1 = PythonOperator(
                   
                )
                task2 = PythonOperator(
                   
                )

               
                task1 >> task2

    except:
        pass

有没有办法用任务组来做到这一点?

Is there a way to do it with taskgroup for this?

推荐答案

这是 Airflow 的反模式.

This is an anti pattern for Airflow.

虽然您可以在顶级代码中使用 Variable.get("df"),但您不应该这样做.变量/连接/使用任何数据库创建查询的任何其他代码应仅在操作符范围内或使用 Jinja 模板完成.这样做的原因是 Airflow 会定期解析 DAG 文件(如果您没有更改 min_file_process_interval 的默认值,则每 30 秒一次)因此每 30 秒与数据库交互一次的代码将导致繁重的负载那个数据库.对于其中一些情况,在未来的气流版本中会有警告(参见 PR)

While you can use Variable.get("df") at a top code you shouldn't do that. Variables / Connections / any other code that creates a query with any database should be done only inside operators scope or using Jinja templating. The reason for this is that Airflow parse the DAG file periodically (every 30 seconds if you didn't change default of min_file_process_interval ) thus having a code that interacts with the database every 30 seconds will cause heavy load on that database. For some of these cases there will be a warning in future airflow versions (see PR)

气流任务应尽可能保持静态(或缓慢变化).

Airflow tasks should be as static as possible (or slowly changing).

这篇关于根据气流中的 sql 查询结果创建动态任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆