如何将带有参数的SQL作为文件传递给Airflow Operator [英] How to pass SQL as file with parameters to Airflow Operator

查看:115
本文介绍了如何将带有参数的SQL作为文件传递给Airflow Operator的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在气流中有一个操作员:

I have an Operator in Airflow:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql="""SELECT * FROM orders where orderid>{0}""".format(parameter),
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

现在,我需要运行的实际查询长度为24行。我想将其保存在文件中,并为操作员提供SQL文件的路径。运算符支持此操作,但是我不确定该使用SQL参数。

Now, the actual query I need to run is 24 rows long. I want to save it in a file and give the operator the path for the SQL file. The operator support this but I'm not sure what to do with the parameter the SQL is needed.

建议?

编辑:
这是我的代码:

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    templates_dict={'sql': '/home/ubuntu/airflow/.../orders_op.sql'},
    sql = '{{ templates_dict.sql }}',
    params={'last_imported_id': LAST_IMPORTED_ORDER_ID, 'table_name' :  TABLE_NAME},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

这给出:


jinja2.exceptions.UndefinedError:'templates_dict'未定义

jinja2.exceptions.UndefinedError: 'templates_dict' is undefined


推荐答案

您已经注意到, MySqlToGoogleCloudStorageOperator 指定一个扩展名为.sql的 template_ext

As you've noticed, the MySqlToGoogleCloudStorageOperator specifies a template_ext with the .sql extension.

首先在 Dag 中,指定放置.sql文件的路径

First in your Dag, specify the path where you put your .sql file

dag = DAG('my_dag', default_args=default_args, schedule_interval="30 7 * * *", template_searchpath = ['/home/ubuntu/airflow/.../myfolder'])

在yourfile.sql中放入大型查询。请注意 params.ord_id

In the yourfile.sql put your large query. Notice the params.ord_id

SELECT * FROM orders where orderid> {{ params.ord_id }}

现在在 sql 运算符的参数,传递文件名。

Now in the sql argument of the operator, pass the name of the file.

import_orders_op = MySqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mysql_conn_id='con1',
    google_cloud_storage_conn_id='con2',
    provide_context=True,
    sql='yourfile.sql',
    params={"ord_id":99},
    bucket=GCS_BUCKET_ID,
    filename=file_name,
    dag=dag) 

重要的是您请勿在该文件名后添加空格。这是因为Jinja模板引擎将查找以 .sql 结尾的字符串,如果这样做,它将把它视为文件而不是字符串。

It is important that you don't put a space after that file name. This is because the Jinja templating engine will look for that string ending with .sql and if it does, it will treat it as a file rather than as a string.

这篇关于如何将带有参数的SQL作为文件传递给Airflow Operator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆