如何控制Airflow DAG的并行性或并发性? [英] How can I control the parallelism or concurrency of an Airflow DAG?

查看:1655
本文介绍了如何控制Airflow DAG的并行性或并发性?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的某些Airflow安装中,即使未完全加载调度程序,调度运行的DAG或任务也不会运行。我如何增加可以同时运行的DAG或任务的数量?

In some of my Airflow installations, DAGs or tasks that are scheduled to run do not run even when the scheduler is not fully loaded. How can I increase the number of DAGs or tasks that can run concurrently?

类似地,如果我的安装处于高负载状态,并且我想限制气流工作人员拉动的速度,排队的任务,我该如何调整?

Similarly, if my installation is under high load and I want to limit how quickly my Airflow workers pull queued tasks, what can I adjust?

推荐答案

以下是Airflow v1.10.2中可用的配置选项的扩展列表。某些可以在每个DAG或每个操作员的基础上进行设置,如果未指定,则可能会退回到设置范围的默认值。

Here's an expanded list of configuration options that are available in Airflow v1.10.2. Some can be set on a per-DAG or per-operator basis and may fall back to the setup-wide defaults if unspecified.

可以在每个DAG基础上指定的选项

Options that can be specified on a per-DAG basis:


  • 并发:已设置为允许在DAG的所有活动运行中同时运行的任务实例数。如果未设置,则默认为 core.dag_concurrency

  • max_active_runs :最大数量该DAG的有效运行。一旦达到此限制,调度程序将不会创建新的活动DAG运行。如果未设置,默认为 core.max_active_runs_per_dag

  • concurrency: the number of task instances allowed to run concurrently across all active runs of the DAG this is set on. Defaults to core.dag_concurrency if not set
  • max_active_runs: maximum number of active runs for this DAG. The scheduler will not create new active DAG runs once this limit is hit. Defaults to core.max_active_runs_per_dag if not set

示例:

# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)

# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)






可以指定的选项在每个操作员的基础上


  • pool :在其中执行任务的池。可用于限制的并行性任务的一个子集

  • task_concurrency :每个任务级别的并发限制

  • pool: the pool to execute the task in. Pools can be used to limit parallelism for only a subset of tasks
  • task_concurrency: limit for per-task level concurrency

示例:

t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)






在整个气流设置中指定的


  • core.parallelism :在整个Airflow安装中运行的最大任务数

  • core.dag_concurrency :每个任务可以运行的最大任务数DAG(跨多个 DAG运行

  • core.non_pooled_task_slot_count :未分配给任务的任务槽数在池中运行

  • core.max_active_runs_per_dag :每个DAG的最大DAG运行次数
  • scheduler.max_threads :调度程序进程应使用多少个线程来调度DAG

  • celery.worker_concurrency :如果使用CeleryExecutor,工作人员将执行的任务实例数量

  • celery.sync_parallelism :进程数CeleryExecutor应该用于同步任务状态

  • core.parallelism: maximum number of tasks running across an entire Airflow installation
  • core.dag_concurrency: max number of tasks that can be running per DAG (across multiple DAG runs)
  • core.non_pooled_task_slot_count: number of task slots allocated to tasks not running in a pool
  • core.max_active_runs_per_dag: maximum number of active DAG runs, per DAG
  • scheduler.max_threads: how many threads the scheduler process should use to use to schedule DAGs
  • celery.worker_concurrency: number of task instances that a worker will take if using CeleryExecutor
  • celery.sync_parallelism: number of processes CeleryExecutor should use to sync task state

这篇关于如何控制Airflow DAG的并行性或并发性?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆