在Airflow中创建动态池 [英] Create dynamic pool in Airflow
问题描述
我有一个DAG,可以创建一个集群,开始计算任务,完成后,将这个集群拆除。我想将此群集上进行的计算任务的并发性限制为固定数量。因此,从逻辑上讲,我需要一个专用于任务创建的集群的池。我不想干扰其他DAG或同一DAG的不同运行。
I have a DAG that creates a cluster, starts computation tasks, and after they completed, tears down the cluster. I want to limit concurrency for the computation tasks carried on this cluster to fixed number. So logically, I need a pool that is exclusive to the cluster created by a task. I don't want interference with other DAGs or different runs of the same DAG.
我想我可以通过从集群后的任务动态创建池来解决此问题创建并删除它,一旦计算任务完成。我以为可以为计算任务的 pool
参数创建模板,以使它们使用此动态创建的集群。
I thought I could solve this problem by creating a pool dynamically from a task after the cluster is created and delete it once the computation tasks are finished. I thought I could template the pool
parameter of the computation tasks to make them use this dynamically created cluster.
# execute registers a pool and returns with the pool name
create_pool = CreatePoolOperator(
slots=4,
task_id='create_pool',
dag=self
)
# the pool parameter is templated
computation = ComputeOperator(
task_id=compute_subtask_name,
pool="{{ ti.xcom_pull(task_ids='create_pool') }}",
dag=self
)
create_pool >> computation
但是这种方式将永远不会触发计算任务。因此,我认为pool参数在进行模板化之前已保存在任务实例中。
我想听听您对如何实现所需行为的想法。
But this way the computqtion tasks will never be triggered. So I think the pool parameter is saved in the task instance before being templated. I would like to hear your thoughts on how to achieve the desired behavior.
推荐答案
以下是创建
from airflow.api.common.experimental.pool import get_pool, create_pool
from airflow.exceptions import PoolNotFound
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
class CreatePoolOperator(BaseOperator):
# its pool blue, get it?
ui_color = '#b8e9ee'
@apply_defaults
def __init__(
self,
name,
slots,
description='',
*args, **kwargs):
super(CreatePoolOperator, self).__init__(*args, **kwargs)
self.description = description
self.slots = slots
self.name = name
def execute(self, context):
try:
pool = get_pool(name=self.name)
if pool:
self.log(f'Pool exists: {pool}')
return
except PoolNotFound:
# create the pool
pool = create_pool(name=self.name, slots=self.slots, description=self.description)
self.log(f'Created pool: {pool}')
删除池的方法可以类似。
deleting the pool could be done in a similar manner.
这篇关于在Airflow中创建动态池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!