Airflow - 记录触发 dag 的用户 [英] Airflow - log the user who triggered the dag
问题描述
我尝试在终止 postgres 挂起查询的气流中记录触发我的 DAG 的用户,但它不起作用.你能帮忙看看有什么问题吗?我错过了什么?当我检查气流中的日志而不是用户名时,到处都是无".
I try to log the user who has triggered my DAG in the Airflow that terminates postgres hanging queries and it does not work. Can you please help what could be wrong? What am I missing? When I check the log in the airflow instead of the user name there is 'None' everywhere.
utils.py(描述会话逻辑的地方)
utils.py (where the session logic is described)
import logging
from airflow.models.log import Log
from airflow.utils.db import create_session
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import RealDictCursor
from plugins.platform.kw_postgres_hook import KwPostgresHook
# To test this use this command:
# airflow tasks test killer_dag killer_query {date} -t '{"pid":"pid_value"}'
# Example :
# airflow tasks test killer_dag killer_query 20210803 -t '{"pid":"12345"}'
def kill_query(**kwargs):
with create_session() as session:
triggered_by = (
session.query(Log.owner)
.filter(
Log.dag_id == "killer_dag",
Log.event == "trigger",
Log.execution_date == kwargs["execution_date"],
)
.limit(1)
.scalar()
)
logging.info(
f"'{triggered_by}' triggered the Killer_dag. Getting PID for the termination."
)
pid = kwargs["params"]["pid"]
logging.info(f"This PID= '{pid}' is going to be terminated by '{triggered_by}'.")
analdb_hook = KwPostgresHook(postgres_conn_id="anal_db")
analdb_conn = analdb_hook.get_conn()
analdb_cur = analdb_conn.cursor(cursor_factory=RealDictCursor)
# Termination query receives pid as a parameter from cli
killer_query = f"""
select pg_terminate_backend('{pid}');
"""
logging.info(killer_query)
# Making sure the user provides existing pid.
# In this part the boolean result of terminating select is checked and if False error is raised.
analdb_cur.execute(killer_query)
result = analdb_cur.fetchone()
exists = result["pg_terminate_backend"]
if exists == True:
logging.info(f"The pid = '{pid}' was terminated by '{triggered_by}'.")
else:
logging.info(f"The pid = '{pid}' not found, check it again!")
return exists
def kill_hanging_queries(killer_dag):
PythonOperator(
task_id="kill_query",
python_callable=kill_query,
dag=killer_dag,
provide_context=True,
)
killer_dag.py
killer_dag.py
from datetime import datetime, timedelta
from airflow.models import DAG
from plugins.platform.utils import skyflow_email_list
from dags.utils.utils import kill_hanging_queries
killer_dag = DAG(
dag_id="killer_dag",
default_args={
"owner": "Data Intelligence: Data Platform",
"email": skyflow_email_list,
"email_on_failure": True,
"email_on_retry": False,
"depends_on_past": False,
"start_date": datetime(2021, 8, 8, 0, 0, 0),
"retries": 0,
"retry_delay": timedelta(minutes=1),
},
)
kill_hanging_queries(killer_dag)
推荐答案
你得到 None
因为查询没有返回任何结果,所以 scalar()
返回 无
作为默认值.
You get None
because the query does not return any results, so scalar()
returns None
as default value.
首先,如果您从 Airflow UI(Browse > Audit Logs)浏览日志并按 dag_id
和 event
过滤您会注意到 execution_date
始终为空,并且日期时间注册在 Dttm
字段下:
First of all, if you browse the logs from the Airflow UI (Browse > Audit Logs) and filter by dag_id
and event
you will notice that execution_date
is always empty, and the datetime is registered under Dttm
field:
这就是您没有得到结果的主要原因,因为当您按 Log.execution_date == kwargs["execution_date"]
过滤时,永远不会匹配.
Thats the main reason why you are not getting results, because when you filter by Log.execution_date == kwargs["execution_date"]
never matches.
因此,为了实现您的需求,您可以按照 this answer 执行类似的查询.以此为来源,您可以进行如下操作以获取最后一个 trigger 事件的 owner
(这很可能是实际运行的执行)并避免必须将日期作为过滤器处理.
So in order to achieve what you are looking for, you could follow this answer where a similiar query is being perfomed. Taking that as a source, you could make something like the following to obtain the owner
of the last trigger event (which is most likely the execution that is actually running) and avoid having to deal with dates as filter.
triggered_by = (
session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner)
.filter(Log.dag_id == "killer_dag", Log.event == "trigger")
.order_by(Log.dttm.desc())
.first()[3]
)
以上返回一个包含所需字段的元组,是 owner
的第三个.
The above returns a tupple with the desired fields, being the third the owner
.
输出:
[2021-08-11 23:05:29,481] {killer_dag.py:41} 信息 - 此 PID='123' 将被 'superUser' 终止.
注意:
请记住,如果您实际上并未触发 DAG(手动或通过调度程序),则不会有任何 Log
可供查询.运行 airflow tasks test ..
不会使用 Log.event == "trigger"
创建任何记录.因此,在进一步调试之前,请确保确实存在要查询的 Log
条目,您可以通过浏览上述 UI 来完成.
Keep in mind that if you don't actually trigger the DAG (either manually or via the scheduler) there wont be any Log
to query. Running airflow tasks test ..
won't create any record with Log.event == "trigger"
. So before further debugging, make sure that actually exists a Log
entry to query, you can do it by browsing the UI as explained above.
为了避免TypeError: 'NoneType' object is not subscriptable
当查询中没有结果时,您可以将查询更改为再次使用 scalar()
:
In order to avoid TypeError: 'NoneType' object is not subscriptable
when there are no results in the query, you could change the query to use scalar()
again:
triggered_by = (
session.query(Log.owner)
.filter(Log.dag_id == "killer_dag", Log.event == "trigger")
.order_by(Log.dttm.desc())
.limit(1)
.scalar()
)
如果这对您有用,请告诉我!
Let me know if that worked for you!
这篇关于Airflow - 记录触发 dag 的用户的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!