运行时气流动态任务 [英] Airflow dynamic tasks at runtime

查看:71
本文介绍了运行时气流动态任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

关于动态任务的其他问题似乎解决了DAG在计划或设计时的动态构建问题。我对在执行过程中将任务动态添加到DAG感兴趣。

Other questions about 'dynamic tasks' seem to address dynamic construction of a DAG at schedule or design time. I'm interested in dynamically adding tasks to a DAG during execution.

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG('test_dag', description='a test',
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 1, 1),
          catchup=False)

def make_tasks():
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1 >> du2 >> du3

p = PythonOperator(
    task_id='python_operator',
    dag=dag,
    python_callable=make_tasks)

这个简单的实现似乎不起作用-虚拟任务永远不会出现在UI中。

This naive implementation doesn't seem to work - the dummy tasks never show up in the UI.

在执行过程中向DAG添加新运算符的正确方法是什么?

What's the correct way to add new operators to the DAG during execution? Is it possible?

推荐答案

不可能在执行DAG的过程中修改DAG(无需做很多工作)。

It it not possible to modify the DAG during its execution (without a lot more work).

dag = DAG(... 由调度程序循环拾取。它将有任务实例'python_operator'。该任务实例在dag运行中进行调度,并由工作人员或执行者执行。由于Airflow DB中的DAG模型仅由调度程序更新,除非您从调度程序中复制所有有关持久化和更新模型的代码,否则添加的虚拟任务将不会持久化到DAG或计划运行,它们将在工作人员退出时被忘记。调度程序访问DAG文件进行解析,该过程可能每分钟一次,每秒一次或更快,这取决于要解析的其他DAG文件的数量。

The dag = DAG(... is picked up in a loop by the scheduler. It will have task instance 'python_operator' in it. That task instance gets scheduled in a dag run, and executed by a worker or executor. Since DAG models in the Airflow DB are only updated by the scheduler these added dummy tasks will not be persisted to the DAG nor scheduled to run. They will be forgotten when the worker exits. Unless you copy all the code from the scheduler regarding persisting & updating the model… but that will be undone the next time the scheduler visits the DAG file for parsing, which could be happening once a minute, once a second or faster depending how many other DAG files there are to parse.

Airflow实际上想要每个DAG在两次运行之间大致保持相同的布局。不断加载/解析DAG文件。因此,尽管您可以制作一个DAG文件,该文件在每次运行时根据某些外部数据动态地确定任务(最好是缓存在文件或pyc模块中,而不是像DB查找那样缓存在网络I / O中),但您会减慢整个调度循环的速度对于 all DAG),这不是一个好的计划,因为您的图形和树形视图将使所有混乱,并且您的调度程序解析将因查找而增加负担。

Airflow actually wants each DAG to approximately stay the same layout between runs. It also wants to reload/parse DAG files constantly. So though you could make a DAG file that on each run determines the tasks dynamically based on some external data (preferably cached in a file or pyc module, not network I/O like a DB lookup, you'll slow down the whole scheduling loop for all the DAGs) it's not a good plan as your graph and tree view will get all confusing, and your scheduler parsing will be more taxed by that lookup.

您可以使可调用对象运行每个任务…

You could make the callable run each task…

def make_tasks(context):
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1.execute(context)
    du2.execute(context)
    du3.execute(context)

p = PythonOperator(
    provides_context=true,

但这是顺序的,您必须弄清楚如何使用python使其平行(使用Future?),如果有任何例外整个任务失败。另外,它绑定到一个执行者或工作人员,因此不使用气流的任务分配(kubernetes,mesos和celery)。

But that's sequential, and you have to work out how to use python to make them parallel (use futures?) and if any raise an exception the whole task fails. Also it is bound to one executor or worker so not using airflow's task distribution (kubernetes, mesos, celery).

另一种处理方法是添加一个固定值任务数(最大任务数),并使用可调用函数将不需要的任务短路或为每个任务使用xcom推送参数,从而在运行时更改其行为,但不更改DAG。

The other way to work with this is to add a fixed number of tasks (the maximal number), and use the callable(s) to short circuit the unneeded tasks or push arguments with xcom for each of them, changing their behavior at run time but not changing the DAG.

这篇关于运行时气流动态任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆