在气流中启动具有可变并行任务的subdag [英] Launch a subdag with variable parallel tasks in airflow

查看:236
本文介绍了在气流中启动具有可变并行任务的subdag的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个要修改的气流工作流程(请参见底部的插图)。

但是,我找不到在文档中执行此操作的方法。

I have an airflow workflow that I'd like to modify (see illustration at the bottom).
However, I couldn't find a way to do that in the docs.

我没有碰到subdags,分支和xcom。

I've looked at subdags, branching and xcoms without luck.

似乎没有一种方法可以根据运算符的返回值来指定子任务中应并行运行多少个任务。

为了解决这个问题,子处理中的每个任务都收到一个不同的参数(上一个运算符返回的列表中的一个元素)

There doesn't seem to be a way to specify how many tasks should run in parallel in a subdag based on a return from an operator.
To add to the problem, each task in the subdag receives a different parameter (an element from the list returned by the previous operator)

'我正在尝试做:

This is an illustration of what I'm trying to do :

推荐答案

我遇到了这个问题以及还没有真正找到解决该问题的干净方法。如果您知道所有可能的不同参数,则将其传递给每个子子句...然后,您可以将其硬编码到DAG文件中,并始终使用每个可能的子子句创建DAG。然后,您有一个运算符(类似于您的获取每n个),该运算符将获取要运行的子子列表,并将其标记为不在列表中的任何下游子子表都已跳过 。像这样的东西:

I've run into this as well and haven't really found a clean way to address it. If you know all the different possible parameters you would pass to each subdag...then what you can do is hardcode that into the DAG file and just always create the DAG with every possible subdag. Then you have an operator (similar your "get every n") which fetches the list of subdags you want to run and have it mark any downstream subdag not in the list as skipped. Something like this:

SUBDAGS = {
    'a': {'id': 'foo'},
    'b': {'id': 'bar'},
    'c': {'id': 'test'},
    'd': {'id': 'hi'},
}   

def _select_subdags(**context):
    names = fetch_list()  # returns ["a", "c", "d"]
    tasks_to_skip = ['my_subdag_' + name for name in set(SUBDAGS) - set(names)]

    session = Session()
    tis = session.query(TaskInstance).filter(
        TaskInstance.dag_id == context['dag'].dag_id, 
        TaskInstance.execution_date == context['ti'].execution_date,
        TaskInstance.task_id.in_(tasks_to_skip),
    )
    for ti in tis:
        now = datetime.utcnow()
        ti.state = State.SKIPPED
        ti.start_date = now
        ti.end_date = now
        session.merge(ti)
    session.commit()
    session.close()

select_subdags = PythonOperator(
    task_id='select_subdags',
    dag=dag,
    provide_context=True,
    python_callable=_select_subdags,
)

for name, params in SUBDAGS.iteritems():
    child_dag_id = 'my_subdag_' + name
    subdag_op = SubDagOperator(
        task_id=child_dag_id,
        dag=dag,
        subdag=my_subdag(dag.dag_id, child_dag_id, params),
    )
    select_subdags >> subdag_op

显然不理想,尤其是当您最终只想运行数百个subdag时。在单个DAG中,成千上万的subdag也遇到了一些性能问题,因为它可能导致大量的任务实例,而其中大部分都被跳过了。

Obviously not ideal, especially when you end up wanting to just run one subdag out of hundreds. We've also run into some performance issues with thousands of subdags in a single DAG, as it can lead to tons of task instances, majority of which are simply skipped.

这篇关于在气流中启动具有可变并行任务的subdag的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆