在Airflow中创建动态池 [英] Create dynamic pool in Airflow

查看:144
本文介绍了在Airflow中创建动态池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个DAG,可以创建一个集群,开始计算任务,完成后,将这个集群拆除。我想将此群集上进行的计算任务的并发性限制为固定数量。因此,从逻辑上讲,我需要一个专用于任务创建的集群的池。我不想干扰其他DAG或同一DAG的不同运行。

I have a DAG that creates a cluster, starts computation tasks, and after they completed, tears down the cluster. I want to limit concurrency for the computation tasks carried on this cluster to fixed number. So logically, I need a pool that is exclusive to the cluster created by a task. I don't want interference with other DAGs or different runs of the same DAG.

我想我可以通过从集群后的任务动态创建池来解决此问题创建并删除它,一旦计算任务完成。我以为可以为计算任务的 pool 参数创建模板,以使它们使用此动态创建的集群。

I thought I could solve this problem by creating a pool dynamically from a task after the cluster is created and delete it once the computation tasks are finished. I thought I could template the pool parameter of the computation tasks to make them use this dynamically created cluster.

# execute registers a pool and returns with the pool name
create_pool = CreatePoolOperator(
    slots=4,
    task_id='create_pool',
    dag=self
)

# the pool parameter is templated
computation = ComputeOperator(
    task_id=compute_subtask_name,
    pool="{{ ti.xcom_pull(task_ids='create_pool') }}",
    dag=self
)

create_pool >> computation

但是这种方式将永远不会触发计算任务。因此,我认为pool参数在进行模板化之前已保存在任务实例中。
我想听听您对如何实现所需行为的想法。

But this way the computqtion tasks will never be triggered. So I think the pool parameter is saved in the task instance before being templated. I would like to hear your thoughts on how to achieve the desired behavior.

推荐答案

以下是创建

from airflow.api.common.experimental.pool import get_pool, create_pool
from airflow.exceptions import PoolNotFound
from airflow.models import BaseOperator
from airflow.utils import apply_defaults


class CreatePoolOperator(BaseOperator):
    # its pool blue, get it?
    ui_color = '#b8e9ee'

    @apply_defaults
    def __init__(
            self,
            name,
            slots,
            description='',
            *args, **kwargs):
        super(CreatePoolOperator, self).__init__(*args, **kwargs)
        self.description = description
        self.slots = slots
        self.name = name

    def execute(self, context):
        try:
            pool = get_pool(name=self.name)
            if pool:
                self.log(f'Pool exists: {pool}')
                return
        except PoolNotFound:
            # create the pool
            pool = create_pool(name=self.name, slots=self.slots, description=self.description)
            self.log(f'Created pool: {pool}')

删除池的方法可以类似。

deleting the pool could be done in a similar manner.

这篇关于在Airflow中创建动态池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆