在运行时在Airflow操作员中创建和使用连接 [英] Create and use Connections in Airflow operator at runtime

查看:315
本文介绍了在运行时在Airflow操作员中创建和使用连接的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注意:这与

  • Export environment variables at runtime with airflow
  • Set Airflow Env Vars at Runtime

我必须从 Airflow DAG触发远程系统上的某些任务。实现此目的的直接方法是 SSHHook

I have to trigger certain tasks at remote systems from my Airflow DAG. The straight-forward way to achieve this is SSHHook.

问题是远程系统是 EMR 群集,该群集本身是在运行时(通过上游任务)使用 EmrCreateJobFlowOperator 。因此,尽管我可以掌握已启动的EMR集群的 job_flow_id 使用 XCOM ),我需要的是 ssh_conn_id 传递给每个下游任务。

The problem is that the remote system is an EMR cluster which is itself created at runtime (by an upstream task) using EmrCreateJobFlowOperator. So while I can get hold of job_flow_id of the launched EMR cluster (using XCOM), what I need is to an ssh_conn_id to be passed to each downstream task.

文档代码,很明显,Airflow会尝试寻找在 db 环境变量中为此连接(使用 conn_id )做好了准备,所以现在问题可以归结为bei ng可以在运行时设置这两个属性中的任何一个(在内部操作符中)。

Looking at the docs and code, it is evident that Airflow will try to look up for this connection (using conn_id) in db and environment variables, so now the problem boils down to being able to set either of these two properties at runtime (from within an operator).

这似乎是一个相当普遍的问题,因为如果无法实现,则会严重阻碍 EmrCreateJobFlowOperator 的实用程序;

This seems a rather common problem because if this isn't achievable then the utility of EmrCreateJobFlowOperator would be severely hampered; but I haven't come across any example demonstrating it.


  • 是否可以创建(并销毁)来自Airflow操作员中的任何一个?

  • Is it possible to create (and also destroy) either of these from within an Airflow operator?

  1. 连接 (位于气流数据库中)

  2. 环境变量(应此处的所有下游任务都可以访问,而不仅仅是当前任务。 a>)

  1. Connection (persisted in Airflow's db)
  2. Environment Variable (should be accessible to all downstream tasks and not just current task as told here)


  • 如果没有,我有什么选择?

  • 我在


    • 气流v1.10

    • Python 3.6.6

    • emr-5.15 (可以根据需要进行升级)

    • Airflow v1.10
    • Python 3.6.6
    • emr-5.15 (can upgrade if required)

    推荐答案

    连接来自ORM



    是的,您可以在运行时创建连接,即使在创建DAG时也要非常小心。 Airflow在其内部模型上是完全透明的,因此您可以直接与基础SqlAlchemy进行交互。正如最初在此答案中所举例说明的那样,它很简单:

    Connections come from the ORM

    Yes, you can create connections at runtime, even at DAG creation time if you're careful enough. Airflow is completely transparent on its internal models, so you can interact with the underlying SqlAlchemy directly. As exemplified originally in this answer, it's as easy as:

    from airflow.models import Connection
    from airflow import settings
    
    def create_conn(username, password, host=None):
        new_conn = Connection(conn_id=f'{username}_connection',
                                      login=username,
                                      host=host if host else None)
        new_conn.set_password(password)
    
        session = settings.Session()
        session.add(new_conn)
        session.commit()
    

    您当然可以在其中与EMR连接可能需要的任何其他其他连接属性进行交互。

    Where you can, of course, interact with any other extra Connection properties you may require for the EMR connection.

    这不是Airflow或Python的限制,但是(每个主要OS的AFAIK)环境都与进程的生命周期息息相关。例如,当您以bash 导出变量时,您只是在说明生成子进程时,想要将该变量复制到孩子的环境。这意味着父进程在创建后就不能更改子级的环境,而子进程也不能更改子级的环境。

    This is not a limitation of Airflow or Python, but (AFAIK for every major OS) environments are bound to the lifetime of a process. When you export a variable in bash for example, you're simply stating that when you spawn child processes, you want to copy that variable to the child's environment. This means that the parent process can't change the child's environment after its creation and the child can't change the parents environment.

    总之,只有进程本身可以创建后更改其环境。并且考虑到工作进程是气流子进程,因此也很难控制其环境的创建。您可以做的是将环境变量写入文件,并在每次任务启动时有意使用该文件的替代更新当前环境。

    In short, only the process itself can change its environment after it's created. And considering that worker process are Airflow subprocesses, it's hard to control the creation of their environments as well. What you can do is to write the environment variables into a file and intentionally update the current environment with overrides from that file on each task start.

    这篇关于在运行时在Airflow操作员中创建和使用连接的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

    查看全文
    登录 关闭
    扫码关注1秒登录
    发送“验证码”获取 | 15天全站免登陆