在Apache Airflow中运行并行任务 [英] Run parallel tasks in Apache Airflow
问题描述
我能够配置 airflow.cfg
文件来依次运行任务。
I am able to configure airflow.cfg
file to run tasks one after the other.
我做什么想做的是,并行执行任务,例如一次2,并到达列表的末尾。
What I want to do is, execute tasks in parallel, e.g. 2 at a time and reach the end of list.
我该如何配置?
推荐答案
在Airflow中并行执行任务取决于您使用的执行器,例如 SequentialExecutor
, LocalExecutor
, CeleryExecutor
等。
Executing tasks in Airflow in parallel depends on which executor you're using, e.g., SequentialExecutor
, LocalExecutor
, CeleryExecutor
, etc.
对于简单的设置,您只需设置执行程序即可实现并行性到您airflow.cfg中的 LocalExecutor
:
For a simple setup, you can achieve parallelism by just setting your executor to LocalExecutor
in your airflow.cfg:
[core]
executor = LocalExecutor
参考:>://://github.com/apache/incubator-airflow/blob/29ae02a070132543ac92706d74d9a5dc676 #L76
这将为每个任务启动一个单独的过程。
This will spin up a separate process for each task.
(当然您需要拥有DAG机智h至少有两个可以并行执行的任务才能正常工作。)
(Of course you'll need to have a DAG with at least 2 tasks that can execute in parallel to see it work.)
或者,使用 CeleryExecutor
,您可以通过运行(任意次数)来旋转任意数量的工人:
Alternatively, with CeleryExecutor
, you can spin up any number of workers by just running (as many times as you want):
$ airflow worker
任务将进入Celery队列,每个Celery工作人员将退出队列。
The tasks will go into a Celery queue and each Celery worker will pull off of the queue.
您可能会发现Airflow Configuration文档中的用Celery进行扩展部分很有帮助。
You might find the section Scaling out with Celery in the Airflow Configuration docs helpful.
https://airflow.apache.org/howto/executor/use-celery.html
对于任何执行者,您可能希望在运行并行处理后调整控制并行性的核心设置。
For any executor, you may want to tweak the core settings that control parallelism once you have that running.
它们都在 [core]
下找到。这些是默认值:
They're all found under [core]
. These are the defaults:
# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# Are DAGs paused by default at creation
dags_are_paused_at_creation = True
# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128
# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16
参考: https://github.com/apache/incubator-airflow/blob/29ae02a070132543ac92706d74d9a5dc676_config/9cf
这篇关于在Apache Airflow中运行并行任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!