使自定义气流宏扩展其他宏 [英] Make custom Airflow macros expand other macros

查看:77
本文介绍了使自定义气流宏扩展其他宏的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以在Airflow中制作一个用户定义的宏,而该宏本身是由其他宏计算得出的?

Is there any way to make a user-defined macro in Airflow which is itself computed from other macros?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date }}"',
    dag=dag,
)

这里的用例是向后移植新的Airflow v1.8 next_execution_date 宏可在Airflow v1.7中使用。不幸的是,此模板在没有宏扩展的情况下呈现:

The use case here is to back-port the new Airflow v1.8 next_execution_date macro to work in Airflow v1.7. Unfortunately, this template is rendered without macro expansion:

$ airflow render simple bash_op 2017-08-09 21:00:00
    # ----------------------------------------------------------
    # property: bash_command
    # ----------------------------------------------------------
    echo "{{ dag.following_schedule(execution_date) }}"


推荐答案

以下是一些解决方案:

class NextExecutionDateAwareBashOperator(BashOperator):
    def render_template(self, attr, content, context):
        dag = context['dag']
        execution_date = context['execution_date']
        context['next_execution_date'] = dag.following_schedule(execution_date)

        return super().render_templates(attr, content, context)
        # or in python 2:
        # return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)

这种方法的优点:您可以捕获

The good part with this approach: you can capture some repeated code in your custom operator.

不好的地方:在呈现模板化字段之前,您必须编写一个自定义运算符以将值添加到上下文中。

The bad part: you have to write a custom operator to add values to the context, before templated fields are rendered.

不一定是值。它们可以是函数。

Macros are not necessarily values. They can be functions.

在您的dag中:

def compute_next_execution_date(dag, execution_date):
    return dag.following_schedule(execution_date)

dag = DAG(
    'simple',
    schedule_interval='0 21 * * *',
    user_defined_macros={
        'next_execution_date': compute_next_execution_date,
    },
)

task = BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
    dag=dag,
)

很好的部分:您可以定义可重用函数来处理运行时可用的值( XCom值,作业实例属性,任务实例属性等),并使函数结果可用于呈现模板。

The good part: you can define reusable functions to process values available at runtime (XCom values, job instance properties, task instance properties, etc...), and make your function result available to render a template.

不好的部分(但不是那么烦人):您必须导入用户定义的mac这样的函数

The bad part (but not that annoying): you have to import such a function as a user defined macro in every dag where needed.

此解决方案最简单(如 Ardan的答案),在您的情况下可能是个好答案。

This solution is the simplest (as mentioned by Ardan's answer), and probably the good one in your case.

BashOperator(
    task_id='bash_op',
    bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
    dag=dag,
)

像这样的简单呼叫的理想选择。它们是可以直接作为使用的其他一些对象(例如 task task_instance 等);甚至某些标准模块都可用(例如 macros.time ,...)。

Ideal for simple calls like this one. And they are some other objects directly available as macros (like task, task_instance, etc...); even some standard modules are available (like macros.time, ...).

这篇关于使自定义气流宏扩展其他宏的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆