在Airflow中的自定义运算符中使用Jinja模板读取SQL文件 [英] Reading sql file with jinja templates in custom operator in Airflow
问题描述
我正在尝试在Airflow的自定义运算符中读取包含带有jinja模板的查询的sql文件.我已经使用PythonOperator实现了它,该函数在我使用
I am trying to read sql file that contains query with jinja templates in the custom operator in Airflow. I have already achieved it using PythonOperator that calls function where I used
def execute_query(**kwargs)
sql_query = open('my_sql_query.sql').read() #(SELECT * FROM my_table WHERE date > {})
sql_query.format(kwargs['ds'])
但是我更喜欢直接在查询中使用这种语法 {{ds}} SELECT * FROM my_table WHERE date>{{ds}}
but I would prefer use this syntax {{ ds }} directly in the query like
SELECT * FROM my_table WHERE date > {{ ds }}
我做了什么:
- 我使用template_fields和template_ext创建了CustomOperator
class SQLOperator(BaseOperator):
template_fields = ['sql']
template_ext = ('.sql',)
@apply_defaults
def __init__(
self,
name = None,
sql = None,
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.name = name
self.sql = sql
def execute(self, context):
print("Name", name) # <- works
print("Query", sql) # <- doesn't work and I don't know how to get the sql file content
- 达格
default_args = {...}
dag = DAG(
'sql_operator_test',
schedule_interval='0 0 * * *',
template_searchpath=['/Users/username/airflow/dags/sql/test/'],
default_args=default_args)
sql_task = SQLOperator(
task_id='sql_process',
name="Aaa",
sql="/Users/username/airflow/dags/sql/test.sql",
dag=dag)
- SQL查询
SELECT * FROM my_table WHERE date > {{ ds }}
我的想法不多了.是否可以选择将文件传递给操作员或获取其呈现的内容?
I am running out of ideas. Is there any option to pass the file to the operator or get its rendered content?
推荐答案
您的方法还可以.我获取了您的代码,并创建了一个工作示例,显示正在根据需要对 {{ds}}
进行模板化:
Your approach is OK.
I took your code and created a working example showing {{ ds }}
is being templated as required:
创建一个 .py
文件为:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class SQLOperator(BaseOperator):
template_fields = ['sql']
template_ext = ('.sql',)
@apply_defaults
def __init__(
self,
name = None,
sql = None,
*args,
**kwargs
) -> None:
super().__init__(**kwargs)
self.name = name
self.sql = sql
def execute(self, context):
print("Name", self.name) # <- works
print("Query", self.sql) # <- Also works :)
default_args = {
'owner': 'a',
'start_date': datetime(2020, 3, 24, 2, 0, 0),
}
dag = DAG(
'sql_operator_test',
schedule_interval=None,
default_args=default_args)
sql_task = SQLOperator(
task_id='sql_process',
name="Aaa",
sql="test.sql",
dag=dag)
.sql
文件为:
SELECT * FROM my_table WHERE date > {{ ds }}
运行它会给出:
并且从任务日志中:
这篇关于在Airflow中的自定义运算符中使用Jinja模板读取SQL文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!