Airflow DAG中的外部文件 [英] External files in Airflow DAG

查看:377
本文介绍了Airflow DAG中的外部文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在气流任务中访问外部文件以读取一些sql,但是却出现找不到文件。有人遇到过吗?

I'm trying to access external files in a Airflow Task to read some sql, and I'm getting "file not found". Has anyone come across this?

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dat',
    start_date=datetime(2017, 1, 1),
    catchup=False,
    schedule_interval=timedelta(days=1)
)

def run_query():
    # read the query
    query = open('sql/queryfile.sql')
    # run the query
    execute(query)

tas = PythonOperator(
    task_id='run_query', dag=dag, python_callable=run_query)

日志状态如下:

IOError: [Errno 2] No such file or directory: 'sql/queryfile.sql'

我知道我可以简单地将查询复制并粘贴到同一文件中,这实际上不是解决之道。有多个查询,并且文本真的很大,将其嵌入Python代码会损害可读性。

I understand that I could simply copy and paste the query inside the same file, it's really not at neat solution. There are multiple queries and the text is really big, embed it with the Python code would compromise readability.

推荐答案

下面是使用变量使其易于使用的示例。

Here is an example use Variable to make it easy.


  • 首先在 Airflow UI -> <$ c $中添加变量 c>管理员-> 变量,例如 {键:'sql_path',值:'your_sql_script_folder'}

  • First add Variable in Airflow UI -> Admin -> Variable, eg. {key: 'sql_path', values: 'your_sql_script_folder'}

然后在您的代码中添加以下代码DAG,要从气流中使用变量,只需添加即可。

Then add following code in your DAG, to use Variable from Airflow you just add.

DAG代码:

import airflow
from airflow.models import Variable

tmpl_search_path = Variable.get("sql_path")

dag = airflow.DAG(
   'tutorial',
    schedule_interval="@daily",
    template_searchpath=tmpl_search_path,  # this
    default_args=default_args
)




  • 现在可以使用sql脚本名称或文件夹变量下的路径

    • Now you can use sql script name or path under folder Variable

      您可以在

      这篇关于Airflow DAG中的外部文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆