如何将带有参数的SQL作为文件传递给Airflow Operator [英] How to pass SQL as file with parameters to Airflow Operator
问题描述
我在气流中有一个操作员:
I have an Operator in Airflow:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
现在,我需要运行的实际查询长度为24行。我想将其保存在文件中,并为操作员提供SQL文件的路径。运算符支持此操作,但是我不确定该使用SQL参数。
Now, the actual query I need to run is 24 rows long. I want to save it in a file and give the operator the path for the SQL file. The operator support this but I'm not sure what to do with the parameter the SQL is needed.
建议?
编辑:
这是我的代码:
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
sql = '{{ templates_dict.sql }}',
params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' : TABLE_NAME},
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
这给出:
jinja2.exceptions.UndefinedError:'templates_dict'未定义
jinja2.exceptions.UndefinedError: 'templates_dict' is undefined
推荐答案
您已经注意到, MySqlToGoogleCloudStorageOperator 指定一个扩展名为.sql的 template_ext
。
As you've noticed, the MySqlToGoogleCloudStorageOperator specifies a template_ext
with the .sql extension.
首先在 Dag
中,指定放置.sql文件的路径
First in your Dag
, specify the path where you put your .sql file
dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])
在yourfile.sql中放入大型查询。请注意 params.ord_id
In the yourfile.sql put your large query. Notice the params.ord_id
SELECT * FROM orders where orderid> {{ params.ord_id }}
现在在 sql $ c中$ c>运算符的参数,传递文件名。
Now in the sql
argument of the operator, pass the name of the file.
import_orders_op = MySqlToGoogleCloudStorageOperator(
task_id='import_orders',
mysql_conn_id='con1',
google_cloud_storage_conn_id='con2',
provide_context=True,
sql='yourfile.sql',
params={"ord_id":99},
bucket=GCS_BUCKET_ID,
filename=file_name,
dag=dag)
重要的是您请勿在该文件名后添加空格。这是因为Jinja模板引擎将查找以 .sql
结尾的字符串,如果这样做,它将把它视为文件而不是字符串。
It is important that you don't put a space after that file name. This is because the Jinja templating engine will look for that string ending with .sql
and if it does, it will treat it as a file rather than as a string.
这篇关于如何将带有参数的SQL作为文件传递给Airflow Operator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!