以编程方式更新Airflow中的连接时出现问题 [英] Problem updating the connections in Airflow programatically
问题描述
我正在尝试使用python更新Airflow连接。我创建了一个python函数,该函数从API获取身份验证令牌并更新Airflow中的连接的额外字段。
I am trying to update the Airflow connections using python. I have created a python function that takes an authentication token from API and updates the extra field of connection in Airflow.
我正在以json格式获取令牌,如下所示:
I am getting token in json format like below:
{
"token" : token_value
}
下面是我正在使用的python代码的一部分
Below is the part of python code that I am using
def set_token():
# Get token from API & update the Airflow Variables
Variable.set("token", str(auth_token))
new_token = Variables.get("token")
get_conn = Connection(conn_id="test_conn")
auth_token = { "header" : new_token}
get_conn.set_extra(str(auth_token))
但是,当我运行任务时,气流连接中的多余字段不会更新。我可以看到我的变量正在更新,但没有更新。有人可以让我知道我缺少什么吗?
But when I run the task, the extra field in airflow connection doesn't get updated. I can see that my Variable is getting updated but not connection. Could anyone please let me know what I am missing?
推荐答案
我怀疑您是从Airflow的meta-db中获取连接的吗?正确的方法。
I doubt that you are fetching connection from Airflow's meta-db in the right way.
- 如果通过
变量 https://github.com/apache/airflow/blob/v1-10-stable/airflow/models/variable.py#L103 rel = nofollow noreferrer> Variable.get()
方法,不应Connection
得到相同的待遇(尽管Connection
类没有get()
函数,必须有变通方法)? - 这里您只是实例化
Connection $带有给定参数
conn_id
的c $ c>对象(而不是从数据库中真正获取该conn_id
的连接)
- All things apart, if you are fetching
Variable
viaVariable.get()
method, shouldn'tConnection
be receiving the same treatment (althoughConnection
class doesn't have aget()
function, there must be a workaround)? - Here you are merely instantiating
Connection
object with a givenconn_id
argument (and not really fetching Connection for thatconn_id
from db)
每当我不得不利用底层的 SQLAlchemy
模型,我查看 cli.py
。从 connections()
函数,这是我认为应该起作用的功能
Whenever I have to exploit the underlying SQLAlchemy
models, I look at cli.py
. Taking cues from connections()
function, here's what I think should work
from airflow.models import Connection
from airflow.settings import Session
from airflow.utils.db import provide_session
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import exc
@provide_session
def update_conn_extra(conn_id: str, new_extra: Any, session: Optional[Session] = None) -> Optional[Connection]:
try:
my_conn: Optional[Connection] = (session
.query(Connection)
.filter(Connection.conn_id == conn_id)
.one())
except exc.NoResultFound:
my_conn: Optional[Connection] = None
except exc.MultipleResultsFound:
my_conn: Optional[Connection] = None
if my_conn:
my_conn.extra: Any = new_extra
session.add(my_conn)
session.commit()
请注意,这里我们使用更新的字段简单地覆盖了Connection(无需先删除现有字段),我发现工作了。如果遇到问题,可以在使用 session.delete(my_conn)
这篇关于以编程方式更新Airflow中的连接时出现问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!