使自定义气流宏扩展其他宏 [英] Make custom Airflow macros expand other macros
问题描述
是否可以在Airflow中制作一个用户定义的宏,而该宏本身是由其他宏计算得出的?
Is there any way to make a user-defined macro in Airflow which is itself computed from other macros?
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
dag = DAG(
'simple',
schedule_interval='0 21 * * *',
user_defined_macros={
'next_execution_date': '{{ dag.following_schedule(execution_date) }}',
},
)
task = BashOperator(
task_id='bash_op',
bash_command='echo "{{ next_execution_date }}"',
dag=dag,
)
这里的用例是向后移植新的Airflow v1.8 next_execution_date
宏可在Airflow v1.7中使用。不幸的是,此模板在没有宏扩展的情况下呈现:
The use case here is to back-port the new Airflow v1.8 next_execution_date
macro to work in Airflow v1.7. Unfortunately, this template is rendered without macro expansion:
$ airflow render simple bash_op 2017-08-09 21:00:00
# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "{{ dag.following_schedule(execution_date) }}"
推荐答案
以下是一些解决方案:
class NextExecutionDateAwareBashOperator(BashOperator):
def render_template(self, attr, content, context):
dag = context['dag']
execution_date = context['execution_date']
context['next_execution_date'] = dag.following_schedule(execution_date)
return super().render_templates(attr, content, context)
# or in python 2:
# return super(NextExecutionDateAwareBashOperator, self).render_templates(attr, content, context)
这种方法的优点:您可以捕获
The good part with this approach: you can capture some repeated code in your custom operator.
不好的地方:在呈现模板化字段之前,您必须编写一个自定义运算符以将值添加到上下文中。
The bad part: you have to write a custom operator to add values to the context, before templated fields are rendered.
宏不一定是值。它们可以是函数。
Macros are not necessarily values. They can be functions.
在您的dag中:
def compute_next_execution_date(dag, execution_date):
return dag.following_schedule(execution_date)
dag = DAG(
'simple',
schedule_interval='0 21 * * *',
user_defined_macros={
'next_execution_date': compute_next_execution_date,
},
)
task = BashOperator(
task_id='bash_op',
bash_command='echo "{{ next_execution_date(dag, execution_date) }}"',
dag=dag,
)
很好的部分:您可以定义可重用函数来处理运行时可用的值( XCom值,作业实例属性,任务实例属性等),并使函数结果可用于呈现模板。
The good part: you can define reusable functions to process values available at runtime (XCom values, job instance properties, task instance properties, etc...), and make your function result available to render a template.
不好的部分(但不是那么烦人):您必须导入用户定义的mac这样的函数
The bad part (but not that annoying): you have to import such a function as a user defined macro in every dag where needed.
此解决方案最简单(如 Ardan的答案),在您的情况下可能是个好答案。
This solution is the simplest (as mentioned by Ardan's answer), and probably the good one in your case.
BashOperator(
task_id='bash_op',
bash_command='echo "{{ dag.following_schedule(execution_date) }}"',
dag=dag,
)
像这样的简单呼叫的理想选择。它们是可以直接作为宏使用的其他一些对象(例如 task
, task_instance
等);甚至某些标准模块都可用(例如 macros.time
,...)。
Ideal for simple calls like this one. And they are some other objects directly available as macros (like task
, task_instance
, etc...); even some standard modules are available (like macros.time
, ...).
这篇关于使自定义气流宏扩展其他宏的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!