以编程方式更新Airflow中的连接时出现问题 [英] Problem updating the connections in Airflow programatically

查看:126
本文介绍了以编程方式更新Airflow中的连接时出现问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用python更新Airflow连接。我创建了一个python函数,该函数从API获取身份验证令牌并更新Airflow中的连接的额外字段。

I am trying to update the Airflow connections using python. I have created a python function that takes an authentication token from API and updates the extra field of connection in Airflow.

我正在以json格式获取令牌,如下所示:

I am getting token in json format like below:

{
   "token" : token_value
}

下面是我正在使用的python代码的一部分

Below is the part of python code that I am using

def set_token():
    # Get token from API & update the Airflow Variables
    Variable.set("token", str(auth_token))
    new_token = Variables.get("token")
    get_conn = Connection(conn_id="test_conn")
    auth_token = { "header" : new_token}
    get_conn.set_extra(str(auth_token))

但是,当我运行任务时,气流连接中的多余字段不会更新。我可以看到我的变量正在更新,但没有更新。有人可以让我知道我缺少什么吗?

But when I run the task, the extra field in airflow connection doesn't get updated. I can see that my Variable is getting updated but not connection. Could anyone please let me know what I am missing?

推荐答案

我怀疑您是从Airflow的meta-db中获取连接的吗?正确的方法。

I doubt that you are fetching connection from Airflow's meta-db in the right way.


  • 如果通过变量 https://github.com/apache/airflow/blob/v1-10-stable/airflow/models/variable.py#L103 rel = nofollow noreferrer> Variable.get()方法,不应 Connection 得到相同的待遇(尽管 Connection 没有 get()函数,必须有变通方法)?

  • 这里您只是实例化 Connection conn_id 的c $ c>对象(而不是从数据库中真正获取该 conn_id 的连接)

  • All things apart, if you are fetching Variable via Variable.get() method, shouldn't Connection be receiving the same treatment (although Connection class doesn't have a get() function, there must be a workaround)?
  • Here you are merely instantiating Connection object with a given conn_id argument (and not really fetching Connection for that conn_id from db)

每当我不得不利用底层的 SQLAlchemy 模型,我查看 cli.py 。从 connections() 函数,这是我认为应该起作用的功能

Whenever I have to exploit the underlying SQLAlchemy models, I look at cli.py. Taking cues from connections() function, here's what I think should work

from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import exc

@provide_session
def update_conn_extra(conn_id: str, new_extra: Any, session: Optional[Session] = None) -> Optional[Connection]:
    try:
        my_conn: Optional[Connection] = (session
                                         .query(Connection)
                                         .filter(Connection.conn_id == conn_id)
                                         .one())
    except exc.NoResultFound:
        my_conn: Optional[Connection] = None
    except exc.MultipleResultsFound:
        my_conn: Optional[Connection] = None
    if my_conn:
        my_conn.extra: Any = new_extra
        session.add(my_conn)
        session.commit()






请注意,这里我们使用更新的字段简单地覆盖了Connection(无需先删除现有字段),我发现工作了。如果遇到问题,可以在使用 session.delete(my_conn)

这篇关于以编程方式更新Airflow中的连接时出现问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆