从Airflow Postgres挂钩检索完整的连接URI [英] Retrieve full connection URI from Airflow Postgres hook
本文介绍了从Airflow Postgres挂钩检索完整的连接URI的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
是否有一种更整洁的方法来从Postgres挂钩中获取完整的URI? .get_uri()
不包含额外参数,因此我将其附加为:
Is there a neater way to get the complete URI from a Postgres hook? .get_uri()
doesn't include the "extra" params so I am appending them like this:
def pg_conn_id_to_uri(postgres_conn_id):
hook = PostgresHook(postgres_conn_id)
uri = hook.get_uri()
extra = hook.get_connection(postgres_conn_id).extra_dejson
params = [ f'{k}={v}' for k, v in extra.items() ]
if params:
params = '&'.join(params)
uri += f'?{params}'
return uri
推荐答案
如果清洁剂不一定在这里暗示简洁,那么这可能有用
If cleaner doesn't necessarily imply for brevity here, then here's something that might work
from typing import Dict, Any
from psycopg2 import extensions
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models.connection import Connection
def pg_conn_id_to_uri(postgres_conn_id: str) -> str:
# create hook & conn
hook: PostgresHook = PostgresHook(postgres_conn_id=postgres_conn_id)
conn: Connection = hook.get_connection(conn_id=postgres_conn_id)
# retrieve conn_args & extras
extras: Dict[str, Any] = conn.extra_dejson
conn_args: Dict[str, Any] = dict(
host=conn.host,
user=conn.login,
password=conn.password,
dbname=conn.schema,
port=conn.port)
conn_args_with_extras: Dict[str, Any] = {**conn_args, **extras}
# build and return string
conn_string: str = extensions.make_dsn(dsn=None, **conn_args_with_extras)
return conn_string
请注意,该代码段未经测试
当然,我们仍然可以从这里剪掉更多的行(例如,使用 python
的语法糖 conn)。 __dict __。items()
),但我更喜欢简洁而不是简洁
Of course we can still trim off some more lines from here (for e.g. by using python
's syntactic sugar conn.__dict__.items()
), but I prefer clarity over brevity
取自 Airflow
的& pyscopg2
的代码本身
The hints have been taken from Airflow
's & pyscopg2
's code itself
PostgresHook
pyscopg2
这篇关于从Airflow Postgres挂钩检索完整的连接URI的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文