在运行时通过气流导出环境变量 [英] Export environment variables at runtime with airflow

查看:87
本文介绍了在运行时通过气流导出环境变量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在将之前在bash脚本中实现的工作流转换为Airflow DAG。在bash脚本中,我只是在运行时使用以下变量导出变量

I am currently converting workflows that were implemented in bash scripts before to Airflow DAGs. In the bash scripts, I was just exporting the variables at run time with

export HADOOP_CONF_DIR="/etc/hadoop/conf"

现在我想在Airflow中做同样的事情,但是还没有找到解决方案然而。我发现的一种解决方法是在任何方法或运算符之外,使用 os.environ [VAR_NAME] ='some_text'设置变量,但这意味着它们在脚本执行时即被导出

Now I'd like to do the same in Airflow, but haven't found a solution for this yet. The one workaround I found was setting the variables with os.environ[VAR_NAME]='some_text' outside of any method or operator, but that means they get exported the moment the script gets loaded, not at run time.

现在,当我尝试调用 os.environ [VAR_NAME] ='some_text'在由PythonOperator调用的函数中,它不起作用。我的代码如下:

Now when I try to call os.environ[VAR_NAME] = 'some_text' in a function that gets called by a PythonOperator, it does not work. My code looks like this

def set_env():
    os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
    os.environ['PATH'] = "somePath:" + os.environ['PATH']
    os.environ['SPARK_HOME'] = "pathToSparkHome"
    os.environ['PYTHONPATH'] = "somePythonPath"
    os.environ['PYSPARK_PYTHON'] = os.popen('which python').read().strip()
    os.environ['PYSPARK_DRIVER_PYTHON'] = os.popen('which python').read().strip()

set_env_operator = PythonOperator(
    task_id='set_env_vars_NOT_WORKING',
    python_callable=set_env,
    dag=dag)

现在执行SparkSubmitOperator时,我得到了异常:

Now when my SparkSubmitOperator gets executed, I get the exception:

Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

与此相关的我的用例是我有 SparkSubmitOperator ,我将作业提交到YARN,因此必须在环境中设置 HADOOP_CONF_DIR YARN_CONF_DIR 。对我来说,在我的 .bashrc 或任何其他配置中设置它们是不可能的,这就是为什么我需要在运行时设置它们。

My use case where this is relevant is that I have SparkSubmitOperator, where I submit jobs to YARN, therefore either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment. Setting them in my .bashrc or any other config is sadly not possible for me, which is why I need to set them at runtime.

最好在执行 SparkSubmitOperator 之前,先在操作员中设置它们,但是如果有可能将它们作为参数传递给 SparkSubmitOperator ,至少可以这样。

Preferably I'd like to set them in an Operator before executing the SparkSubmitOperator, but if there was the possibility to pass them as arguments to the SparkSubmitOperator, that would be at least something.

推荐答案

火花提交操作符,您可以将环境变量作为字典传递给spark-submit。

From what I can see in the spark submit operator you can pass in environment variables to spark-submit as a dictionary.

:param env_vars: Environment variables for spark-submit. It
                 supports yarn and k8s mode too.
:type env_vars: dict

您尝试过吗?

这篇关于在运行时通过气流导出环境变量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆