气流中的Python脚本调度 [英] Python script scheduling in airflow

查看:114
本文介绍了气流中的Python脚本调度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

大家好,



我需要安排我的python 文件(其中包含从sql中提取的数据和一些联接)。我已经成功将airflow安装到我的linux服务器中,并且可以使用airweb webserver。但是即使查阅了文档之后,我仍然不清楚 我确切地需要在哪里编写脚本来进行调度,以及该脚本如何在气流Web服务器中可用,以便我可以看到状态



就配置而言,我知道dag文件夹在主目录中以及示例dag的位置。



注意:,请不要在如何在Airflow中运行bash脚本文件中将其标记为重复,因为我需要运行位于其他位置的python文件。



请在Airflow Web服务器中找到以下配置:





下面是AIRFLOW_HOME目录中dag文件夹的屏幕快照





也请找到以下DAG创建屏幕截图和DAG错误丢失的屏幕截图





解决方案

您可能应该使用 PythonOperator 来调用您的函数。如果要在其他地方定义该函数,则只要从 PYTHONPATH 可以访问它,就可以从模块中简单地导入它。

 从气流导入DAG 
从airflow.operators.python_operator导入PythonOperator

从my_script导入my_python_function

dag = DAG('教程',default_args = default_args)

PythonOperator(dag = dag,
task_id ='my_task_powered_by_python',
Provide_context = False,
python_callable = my_python_function,
op_args = ['arguments_passed_to_callable'],
op_kwargs = {'keyword_argument':'将传递给函数'})

如果您的函数 my_python_function 在脚本文件 / path / to / my / scripts中/dir/my_script.py



然后在启动Airflow之前,可以将脚本的路径添加到 PYTHONPATH 像这样:

  export PYTHONPATH = / path / to / my / scripts / dir /:$ PYTHONPATH 

此处有更多信息:
https://airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator



默认args和其他注意事项,如本教程中所述: https://airflow.incubator.apache.org/tutorial .html


Hi everyone,

I need to schedule my python files(which contains data extraction from sql and some joins) using airflow. I have successfully installed airflow into my linux server and webserver of airflow is available with me. But even after going through documentation I am not clear where exactly I need to write script for scheduling and how will that script be available into airflow webserver so I could see the status

As far as the configuration is concerned I know where the dag folder is located in my home directory and also where example dags are located.

Note: Please dont mark this as duplicate with How to run bash script file in Airflow as I need to run python files lying in some different location.

Please find the configuration in Airflow webserver as :

Below is the screenshot of dag folder in AIRFLOW_HOME dir

Also find the below screenshot for DAG creation screenshot and Missing DAG error

After i select the simple DAG following error of missing DAG is populated

解决方案

You should probably use the PythonOperator to call your function. If you want to define the function somewhere else, you can simply import it from a module as long as it's accessible in your PYTHONPATH.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator

from my_script import my_python_function

dag = DAG('tutorial', default_args=default_args)

PythonOperator(dag=dag,
               task_id='my_task_powered_by_python',
               provide_context=False,
               python_callable=my_python_function,
               op_args=['arguments_passed_to_callable'],
               op_kwargs={'keyword_argument':'which will be passed to function'})

If your function my_python_function was in a script file /path/to/my/scripts/dir/my_script.py

Then before starting Airflow, you could add the path to your scripts to the PYTHONPATH like so:

export PYTHONPATH=/path/to/my/scripts/dir/:$PYTHONPATH

More information here: https://airflow.incubator.apache.org/code.html#airflow.operators.PythonOperator

Default args and other considerations as in the tutorial: https://airflow.incubator.apache.org/tutorial.html

这篇关于气流中的Python脚本调度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆