如何在虚拟环境中运行Airflow PythonOperator [英] How to run Airflow PythonOperator in a virtual environment

查看:523
本文介绍了如何在虚拟环境中运行Airflow PythonOperator的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有几个正在使用BashOperator执行的python文件。这使我可以灵活地轻松选择python虚拟环境。

I have several python files that I'm currently executing using BashOperator. This allows me the flexibility to choose the python virtual environment easily.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
   'owner': 'airflow',
    'depends_on_past': False,
    ...}

dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")

t1 = BashOperator(
                 task_id='task1',
                bash_command='~/anaconda3/envs/myenv/bin/python 
                              /python_files/python_task1.py',
                 dag=dag)

如何使用PythonOperator实现类似的目的?

How can I achieve the same using PythonOperator to something like this?

from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1

python_task = PythonOperator(
              task_id='python_task',
              python_callable=python_task1.main,
             dag=dag)

我认为PythonOperator将使用系统python环境。
我发现Airflow具有PythonVirtualenvOperator,但这似乎可以通过使用指定的要求动态创建新的虚拟环境来工作。我希望使用已经正确配置的现有设备。
如何使用指定的python路径运行PythonOperator?

I assume PythonOperator will use the system python environment. I've found that Airflow has the PythonVirtualenvOperator, but this appears to work by creating a new virtual env on the fly using the specified requirements. I'd prefer to use an existing one that is already properly configured. How can I run PythonOperator with a specified python path?

推荐答案

首先,首先:您不应该(通常)为您的运营商依靠现有资源。您的操作员应该是可移植的,因此使用长期存在的virtualenvs有点违反该原则。话虽如此,这没什么大不了的,就像您必须将软件包预安装到全局环境中一样,您可以预烘焙一些环境。或者,您可以让操作员创建环境,随后的操作员可以重用它-我认为,这是最简单,最危险的方法。

First things first: you should not (in general) rely on pre-existing resources for your Operators. You operators should be portable, so using longstanding virtualenvs is somewhat against that principle. That being said, it's not as much of a big deal, just like you have to preinstall packages to the global environment you can pre-bake a few environments. Or, you can let the Operator create the environment and subsequent operators may reuse it - which is, I believe, the easiest and most dangerous approach.

实现 virtualenv缓存 应该不难。阅读 PythonVirtualenvOperator 的执行方法:

Implementing a "virtualenv cache" shouldn't be difficult. Reading the implementation of PythonVirtualenvOperator's execution method:

def execute_callable(self):
    with TemporaryDirectory(prefix='venv') as tmp_dir:
        ...
        self._execute_in_subprocess(
            self._generate_python_cmd(tmp_dir,
                                      script_filename,
                                      input_filename,
                                      output_filename,
                                      string_args_filename))
        return self._read_result(output_filename)

因此,它似乎并未明确删除virtualenv(它依赖于 TemporaryDirect ory )。您可以将 PythonVirtualenvOperator 子类化,然后简单地使用自己的上下文管理器来重用临时目录:

So it looks like it doesn't delete the virtualenv explicitly (it relies on TemporaryDirectory to do that). You can subclass PythonVirtualenvOperator and simply use your own context manager that reuses temporary directories:

import glob

@contextmanager
def ReusableTemporaryDirectory(prefix):
    try:
        existing = glob.glob('/tmp/' + prefix + '*')
        if len(existing):
            name = existing[0]
        else:
            name = mkdtemp(prefix=prefix)
        yield name
    finally:
        # simply don't delete the tmp dir
        pass

def execute_callable(self):
    with ReusableTemporaryDirectory(prefix='cached-venv') as tmp_dir:
        ...

自然地,您可以摆脱<$ c在 ReusableTemporaryDirectory 中的$ c> try-finally 并放回通常的后缀 dir 参数,我做了最小的更改以使其易于与原始的 TemporaryDirectory 类进行比较。

Naturally, you can get rid of the try-finally in ReusableTemporaryDirectory and put back the usual suffix and dir arguments, I made minimal changes to make it easy to compare with the original TemporaryDirectory class.

有了这个,您的virtualenv不会被丢弃,但是操作员最终会安装较新的依赖项。

With this, your virtualenv won't be discarded but newer dependencies will be eventually installed by the Operator.

这篇关于如何在虚拟环境中运行Airflow PythonOperator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆