使用 jinja 模板中的气流连接 [英] use Airflow connection from a jinja template

查看:25
本文介绍了使用 jinja 模板中的气流连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用环境变量将 DB 参数传递给 BashOperator,但我找不到任何文档/示例如何使用 Jinja 模板中的连接.

I'm trying to pass DB params to BashOperator using environment variables, but I can't find any documentation/examples how to use a connection from a Jinja template.

所以我正在寻找类似于变量的东西

So I'm looking for something similar to variables

echo {{ var.value.<variable_name> }}

推荐答案

For Airflow >= 2.2.0:

假设你有 conn id test_conn 你可以通过以下方式直接使用宏:

Assuming you have conn id test_conn you can use macros directly via:

{{ conn.test_conn }} 所以你得到任何连接属性,如:

{{ conn.test_conn }} so you get any connection attribute like:

{{ conn.test_conn.host }}, {{ conn.test_conn.login }}, {{ conn.test_conn.password }} 等等.

对于气流<2.2.0:

没有现成的宏,但您可以创建自定义宏来解决这个问题.

There is no ready to use macro however you can create custom macros to address this.

连接示例:

创建宏:

def get_host(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.host

def get_schema(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.schema

def get_login(conn_id):
    connection = BaseHook.get_connection(conn_id)
    return connection.login

在 DAG 中使用它们:

Using them in a DAG:

def print_function(**context):
    print(f"host={context['host']} schema={context['schema']} login={context['login']}")

user_macros = {
    'get_host': get_host,
    'get_schema': get_schema,
    'get_login': get_login,
}

with DAG(
    dag_id='connection',
    default_args=default_args,
    schedule_interval=None,
    user_defined_macros=user_macros,
) as dag:

# Example how to use as function
python_op = PythonOperator( 
    task_id='python_task',
    provide_context=True,
    python_callable=print_function,
    op_kwargs={
        'host': get_host("test_conn"),
        'schema': get_schema("test_conn"),
        'login': get_login("test_conn"),
    }
)

# Example how to use as Jinja string
bash_op = BashOperator( 
    task_id='bash_task',
    bash_command='echo {{ get_host("test_conn") }} {{ get_schema("test_conn") }} {{ get_login("test_conn") }} ',
)

PythonOperator 的渲染示例:

BashOperator 的渲染示例:

一般说明:这段代码的作用是创建一个自定义函数 func() 用作 user_defined_macros 从而提供使用它的能力,就像 Airflow 本身定义的这个宏一样.您可以通过以下方式访问模板: {{ func() }} 如示例中所示,该函数允许接受参数.

General Explnation: What this code does is creating a custom function func() to be used as user_defined_macros thus providing the ability to use it just like this macro was defined by Airflow itself. You can access the templating as: {{ func() }} as seen in the example the function allow accept parameters.

注意您可以为连接对象中的所有字段创建这样的函数.

Note you can create such functions for all fields in the connection object.

谨慎使用它,以文本形式传递密码可能不是一个好主意.

be cautious with how you use it, passing passwords as text may not be a good idea.

这篇关于使用 jinja 模板中的气流连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆