Airflow - 记录触发 dag 的用户 [英] Airflow - log the user who triggered the dag

查看:45
本文介绍了Airflow - 记录触发 dag 的用户的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我尝试在终止 postgres 挂起查询的气流中记录触发我的 DAG 的用户,但它不起作用.你能帮忙看看有什么问题吗?我错过了什么?当我检查气流中的日志而不是用户名时,到处都是无".

I try to log the user who has triggered my DAG in the Airflow that terminates postgres hanging queries and it does not work. Can you please help what could be wrong? What am I missing? When I check the log in the airflow instead of the user name there is 'None' everywhere.

utils.py(描述会话逻辑的地方)

utils.py (where the session logic is described)

import logging
from airflow.models.log import Log
from airflow.utils.db import create_session
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import RealDictCursor
from plugins.platform.kw_postgres_hook import KwPostgresHook


# To test this use this command:
# airflow tasks test killer_dag killer_query {date} -t '{"pid":"pid_value"}'
# Example :
# airflow tasks test killer_dag killer_query 20210803 -t '{"pid":"12345"}'


def kill_query(**kwargs):
    with create_session() as session:
        triggered_by = (
            session.query(Log.owner)
            .filter(
                Log.dag_id == "killer_dag",
                Log.event == "trigger",
                Log.execution_date == kwargs["execution_date"],
            )
            .limit(1)
            .scalar()
        )
    logging.info(
        f"'{triggered_by}' triggered the Killer_dag. Getting PID for the termination."
    )
    pid = kwargs["params"]["pid"]
    logging.info(f"This PID= '{pid}' is going to be terminated by '{triggered_by}'.")
    analdb_hook = KwPostgresHook(postgres_conn_id="anal_db")
    analdb_conn = analdb_hook.get_conn()
    analdb_cur = analdb_conn.cursor(cursor_factory=RealDictCursor)
    # Termination query receives pid as a parameter from cli
    killer_query = f"""
        select pg_terminate_backend('{pid}');
    """
    logging.info(killer_query)
    # Making sure the user provides existing pid.
    # In this part the boolean result of terminating select is checked and if False error is raised.
    analdb_cur.execute(killer_query)
    result = analdb_cur.fetchone()
    exists = result["pg_terminate_backend"]
    if exists == True:
        logging.info(f"The pid = '{pid}' was terminated by '{triggered_by}'.")
    else:
        logging.info(f"The pid = '{pid}' not found, check it again!")
    return exists


def kill_hanging_queries(killer_dag):
    PythonOperator(
        task_id="kill_query",
        python_callable=kill_query,
        dag=killer_dag,
        provide_context=True,
    )

killer_dag.py

killer_dag.py

from datetime import datetime, timedelta
from airflow.models import DAG
from plugins.platform.utils import skyflow_email_list
from dags.utils.utils import kill_hanging_queries


killer_dag = DAG(
    dag_id="killer_dag",
    default_args={
        "owner": "Data Intelligence: Data Platform",
        "email": skyflow_email_list,
        "email_on_failure": True,
        "email_on_retry": False,
        "depends_on_past": False,
        "start_date": datetime(2021, 8, 8, 0, 0, 0),
        "retries": 0,
        "retry_delay": timedelta(minutes=1),
    },
)
kill_hanging_queries(killer_dag)

推荐答案

你得到 None 因为查询没有返回任何结果,所以 scalar() 返回 作为默认值.

You get None because the query does not return any results, so scalar() returns None as default value.

首先,如果您从 Airflow UI(Browse > Audit Logs)浏览日志并按 dag_idevent 过滤您会注意到 execution_date 始终为空,并且日期时间注册在 Dttm 字段下:

First of all, if you browse the logs from the Airflow UI (Browse > Audit Logs) and filter by dag_id and event you will notice that execution_date is always empty, and the datetime is registered under Dttm field:

这就是您没有得到结果的主要原因,因为当您按 Log.execution_date == kwargs["execution_date"] 过滤时,永远不会匹配.

Thats the main reason why you are not getting results, because when you filter by Log.execution_date == kwargs["execution_date"] never matches.

因此,为了实现您的需求,您可以按照 this answer 执行类似的查询.以此为来源,您可以进行如下操作以获取最后一个 trigger 事件的 owner(这很可能是实际运行的执行)并避免必须将日期作为过滤器处理.

So in order to achieve what you are looking for, you could follow this answer where a similiar query is being perfomed. Taking that as a source, you could make something like the following to obtain the owner of the last trigger event (which is most likely the execution that is actually running) and avoid having to deal with dates as filter.

triggered_by = (
    session.query(Log.dttm, Log.dag_id, Log.execution_date, Log.owner)
    .filter(Log.dag_id == "killer_dag", Log.event == "trigger")
    .order_by(Log.dttm.desc())
    .first()[3]
)

以上返回一个包含所需字段的元组,是 owner 的第三个.

The above returns a tupple with the desired fields, being the third the owner.

输出:

[2021-08-11 23:05:29,481] {killer_dag.py:41} 信息 - 此 PID='123' 将被 'superUser' 终止.

注意:

请记住,如果您实际上并未触发 DAG(手动或通过调度程序),则不会有任何 Log 可供查询.运行 airflow tasks test .. 不会使用 Log.event == "trigger" 创建任何记录.因此,在进一步调试之前,请确保确实存在要查询的 Log 条目,您可以通过浏览上述 UI 来完成.

Keep in mind that if you don't actually trigger the DAG (either manually or via the scheduler) there wont be any Log to query. Running airflow tasks test .. won't create any record with Log.event == "trigger". So before further debugging, make sure that actually exists a Log entry to query, you can do it by browsing the UI as explained above.

为了避免TypeError: 'NoneType' object is not subscriptable 当查询中没有结果时,您可以将查询更改为再次使用 scalar():

In order to avoid TypeError: 'NoneType' object is not subscriptable when there are no results in the query, you could change the query to use scalar() again:

triggered_by = (
    session.query(Log.owner)
    .filter(Log.dag_id == "killer_dag", Log.event == "trigger")
    .order_by(Log.dttm.desc())
    .limit(1)
    .scalar()
)

如果这对您有用,请告诉我!

Let me know if that worked for you!

这篇关于Airflow - 记录触发 dag 的用户的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆