Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务 [英] Airflow 1.10.3 SubDag can only run 1 task in parallel even the concurrency is 8

查看:4
本文介绍了Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近,我将Airflow从1.9升级到1.10.3(最新版本)。

但是,我确实注意到了与SubDag并发性相关的性能问题。只能代答SubDag内的1个任务,这不是应有的方式,我们的SubDag的并发设置为8。

请参阅以下内容: get_monthly_summary-214get_monthly_summary-215是两个子DAG,可以通过父DAG并发在并行控制器中运行

但当放大到SubDag时,请说get_monthly_summary-214,然后 你可以清楚地看到,一次只有一个任务在运行,其他任务都在排队,它一直以这种方式运行。当我们检查SubDag并发性时,它实际上是我们在代码中指定的8:

我们确实设置了池槽大小,它是32,我们确实有8个芹菜工人来接手排队的任务,并且我们与并发关联的气流配置如下:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16
此外,所有SubDag都是使用名为mini的队列配置的,而它的所有内部任务都是名为default的默认队列,因为如果我们在同一队列上同时运行SubDag运算符和SubDag内部任务,可能会有一些deadlock problems。我还尝试为所有任务和操作员使用default队列,但没有帮助。

旧版本1.9似乎很好,每个SubDag可以并行执行多个任务,我们有什么遗漏吗?

推荐答案

基于上面发布的@kaxil的发现,如果您仍然希望并行执行子DAG中的任务,解决方案是创建一个包装函数以显式传递executorWhen构造SubDagOperator

from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor

def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
    return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)
在创建subdag操作符时调用sub_dag_operator_with_default_executor。为了减轻子DAG操作员的工作量performance concerns

我们应该将subdag_OPERATOR的默认执行器更改为SequentialExecutor。子操作员不支持气流池,因此它可能会消耗所有的工作资源(例如,在celeryExecutor中)。这会导致Airflow-74中提到的问题,并限制subdag_OPERATOR的使用。我们通过指定使用顺序执行程序来在生产中使用subdag_OPERATOR。

我们建议创建一个特殊的队列(我们在本例中指定Queue=‘mini’)和芹菜工人来处理subdag_OPERATOR,这样它就不会消耗您普通芹菜工人的所有资源。具体如下:

 dag = DAG(
    dag_id=DAG_NAME,
    description=f"{DAG_NAME}-{__version__}",
    ...
)    
with dag:
        ur_operator = sub_dag_operator_with_default_executor(
                task_id=f"your_task_id",
                subdag=load_sub_dag(
                    parent_dag_name=DAG_NAME,
                    child_dag_name=f"your_child_dag_name",
                    args=args,
                    concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
                ),
                queue="mini",
                dag=dag
            )

那么当您创建您的特殊芹菜工人(我们使用的是2核和3G内存这样的轻量级主机)时,指定AIRFLOW__CELERY__DEFAULT_QUEUEmini,取决于您希望并行运行多少子DAG操作符,您应该创建多个专门的芹菜工人来负载均衡资源,我们建议每个特殊的芹菜工人一次最多注意2个子DAG操作符,否则会耗尽(例如,在2核和3G内存主机上内存用完)

您还可以通过在Airflow UIVariables配置页面中创建的ENV VARconcurrency_in_sub_dag在Subdag内部调整concurrency

更新[22/05/2020]以上仅适用于气流(<;=1.10.3,>=1.10.0) 对于超过1.10.3的气流,请使用

from airflow.executors import get_default_executor

相反。

这篇关于Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆