Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务 [英] Airflow 1.10.3 SubDag can only run 1 task in parallel even the concurrency is 8
问题描述
最近,我将Airflow从1.9升级到1.10.3(最新版本)。
但是,我确实注意到了与SubDag并发性相关的性能问题。只能代答SubDag内的1个任务,这不是应有的方式,我们的SubDag的并发设置为8。 请参阅以下内容:get_monthly_summary-214
和get_monthly_summary-215
是两个子DAG,可以通过父DAG并发在并行控制器中运行
但当放大到SubDag时,请说get_monthly_summary-214
,然后
你可以清楚地看到,一次只有一个任务在运行,其他任务都在排队,它一直以这种方式运行。当我们检查SubDag并发性时,它实际上是我们在代码中指定的8:
我们确实设置了池槽大小,它是32,我们确实有8个芹菜工人来接手排队的任务,并且我们与并发关联的气流配置如下:
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor
# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
worker_concurrency = 16
此外,所有SubDag都是使用名为mini
的队列配置的,而它的所有内部任务都是名为default
的默认队列,因为如果我们在同一队列上同时运行SubDag运算符和SubDag内部任务,可能会有一些deadlock problems。我还尝试为所有任务和操作员使用default
队列,但没有帮助。
旧版本1.9似乎很好,每个SubDag可以并行执行多个任务,我们有什么遗漏吗?
推荐答案
基于上面发布的@kaxil的发现,如果您仍然希望并行执行子DAG中的任务,解决方案是创建一个包装函数以显式传递executor
When构造SubDagOperator
:
from airflow.operators.subdag_operator import SubDagOperator
from airflow.executors import GetDefaultExecutor
def sub_dag_operator_with_default_executor(subdag, *args, **kwargs):
return SubDagOperator(subdag=subdag, executor=GetDefaultExecutor(), *args, **kwargs)
在创建subdag操作符时调用sub_dag_operator_with_default_executor
。为了减轻子DAG操作员的工作量performance concerns
我们建议创建一个特殊的队列(我们在本例中指定Queue=‘mini’)和芹菜工人来处理subdag_OPERATOR,这样它就不会消耗您普通芹菜工人的所有资源。具体如下:我们应该将subdag_OPERATOR的默认执行器更改为SequentialExecutor。子操作员不支持气流池,因此它可能会消耗所有的工作资源(例如,在celeryExecutor中)。这会导致Airflow-74中提到的问题,并限制subdag_OPERATOR的使用。我们通过指定使用顺序执行程序来在生产中使用subdag_OPERATOR。
dag = DAG(
dag_id=DAG_NAME,
description=f"{DAG_NAME}-{__version__}",
...
)
with dag:
ur_operator = sub_dag_operator_with_default_executor(
task_id=f"your_task_id",
subdag=load_sub_dag(
parent_dag_name=DAG_NAME,
child_dag_name=f"your_child_dag_name",
args=args,
concurrency=dag_config.get("concurrency_in_sub_dag") or DEFAULT_CONCURRENCY,
),
queue="mini",
dag=dag
)
那么当您创建您的特殊芹菜工人(我们使用的是2核和3G内存这样的轻量级主机)时,指定AIRFLOW__CELERY__DEFAULT_QUEUE
为mini
,取决于您希望并行运行多少子DAG操作符,您应该创建多个专门的芹菜工人来负载均衡资源,我们建议每个特殊的芹菜工人一次最多注意2个子DAG操作符,否则会耗尽(例如,在2核和3G内存主机上内存用完)
您还可以通过在Airflow UIVariables
配置页面中创建的ENV VARconcurrency_in_sub_dag
在Subdag内部调整concurrency
。
更新[22/05/2020]以上仅适用于气流(<;=1.10.3,>=1.10.0) 对于超过1.10.3的气流,请使用
from airflow.executors import get_default_executor
相反。
这篇关于Airflow 1.10.3 SubDag即使并发性为8也只能并行运行1个任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!