在Airflow中的自定义运算符中使用Jinja模板读取SQL文件 [英] Reading sql file with jinja templates in custom operator in Airflow

查看:70
本文介绍了在Airflow中的自定义运算符中使用Jinja模板读取SQL文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在Airflow的自定义运算符中读取包含带有jinja模板的查询的sql文件.我已经使用PythonOperator实现了它,该函数在我使用

I am trying to read sql file that contains query with jinja templates in the custom operator in Airflow. I have already achieved it using PythonOperator that calls function where I used

def execute_query(**kwargs)
    sql_query = open('my_sql_query.sql').read()   #(SELECT * FROM my_table WHERE date > {})
    sql_query.format(kwargs['ds'])

但是我更喜欢直接在查询中使用这种语法 {{ds}} SELECT * FROM my_table WHERE date>{{ds}}

but I would prefer use this syntax {{ ds }} directly in the query like SELECT * FROM my_table WHERE date > {{ ds }}

我做了什么:

  1. 我使用template_fields和template_ext创建了CustomOperator

class SQLOperator(BaseOperator):

   template_fields = ['sql']
   template_ext = ('.sql',)

   @apply_defaults
   def __init__(
       self,
       name = None,
       sql = None,
       *args, **kwargs) -> None:
       super().__init__(*args, **kwargs)
       self.name = name
       self.sql = sql

    def execute(self, context):
       print("Name", name) # <- works
       print("Query", sql) # <- doesn't work and I don't know how to get the sql file content

  1. 达格

default_args = {...}

dag = DAG(
    'sql_operator_test',
     schedule_interval='0 0 * * *',
     template_searchpath=['/Users/username/airflow/dags/sql/test/'],
     default_args=default_args)

sql_task = SQLOperator(
    task_id='sql_process',
    name="Aaa",
    sql="/Users/username/airflow/dags/sql/test.sql",
    dag=dag)

  1. SQL查询

SELECT * FROM my_table WHERE date > {{ ds }}

我的想法不多了.是否可以选择将文件传递给操作员或获取其呈现的内容?

I am running out of ideas. Is there any option to pass the file to the operator or get its rendered content?

推荐答案

您的方法还可以.我获取了您的代码,并创建了一个工作示例,显示正在根据需要对 {{ds}} 进行模板化:

Your approach is OK. I took your code and created a working example showing {{ ds }} is being templated as required:

创建一个 .py 文件为:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class SQLOperator(BaseOperator):

   template_fields = ['sql']
   template_ext = ('.sql',)

   @apply_defaults
   def __init__(
           self,
           name = None,
           sql = None,
           *args,
           **kwargs
   ) -> None:

       super().__init__(**kwargs)
       self.name = name
       self.sql = sql

   def execute(self, context):
       print("Name", self.name)  # <- works
       print("Query", self.sql)  # <- Also works :)

default_args = {
    'owner': 'a',
    'start_date': datetime(2020, 3, 24, 2, 0, 0),
}


dag = DAG(
    'sql_operator_test',
     schedule_interval=None,
     default_args=default_args)

sql_task = SQLOperator(
    task_id='sql_process',
    name="Aaa",
    sql="test.sql",
    dag=dag)

.sql 文件为:

SELECT * FROM my_table WHERE date > {{ ds }}

运行它会给出:

并且从任务日志中:

这篇关于在Airflow中的自定义运算符中使用Jinja模板读取SQL文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆