我可以以编程方式确定是否安排了Airflow DAG还是手动触发了它? [英] Can I programmatically determine if an Airflow DAG was scheduled or manually triggered?

查看:481
本文介绍了我可以以编程方式确定是否安排了Airflow DAG还是手动触发了它?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想创建一个片段,该片段基于DAG是计划的还是手动触发的来传递正确的日期。 DAG每月运行一次。 DAG根据上个月的数据生成报告(SQL查询)。

I want to create a snippet that passes the correct date based on whether the DAG was scheduled or whether it was triggered manually. The DAG runs monthly. The DAG generates a report (A SQL query) based on the data of the previous month.

如果我按计划运行DAG,则可以使用以下命令获取上个月的数据: jinja片段:

If I run the DAG scheduled, I can fetch the previous month with the following jinja snippet:

execution_date.month

鉴于DAG计划在上一个周期(上个月)结束时,execution_date将正确返回上个月。但是,在手动运行时,它将返回当前月份(执行日期将是手动触发的日期)。

given that the DAG is scheduled at the end of the previous period (last month) the execution_date will correctly return the last month. However on manual runs this will return the current month (execution date will be the date of the manual trigger).

我想编写一个处理这种情况的简单宏。但是,我找不到以编程方式查询DAG是否以编程方式触发的好方法。我能想到的最好的办法是从数据库中获取 run_id (通过创建具有数据库会话的宏),然后检查 run_id 包含单词 manual 。有没有更好的方法来解决此问题?

I want to write a simple macro that deals with this case. However I could not find a good way to programmatically query whether the DAG is triggered programmatically. The best I could come up with is to fetch the run_id from the database (by creating a macro that has a DB session), check wheter the run_id contains the word manual. Is there a better way to solve this problem?

推荐答案

目前没有直接的DAG属性可以识别手动运行。
要获取此信息,您需要检查您提到的 run_id

There is no direct DAG property to identify manual runs for now. To get this information you would need to check the run_id as you mentioned.

但是,有一个专用宏可以获取 run_id 。您不必自己从数据库中获取数据。
以下是如何使用它的示例:

However, there is a dedicated macro get the run_id. You don't have to fetch it from the database by yourself. Here is an example on how to use it :

    def some_task_py(**context):
        run_id = context['templates_dict']['run_id']
        is_manual = run_id.startswith('manual__')
        is_scheduled = run_id.startswith('scheduled__')


    some_task = PythonOperator(
                task_id = 'some_task',
                dag=dag,
                templates_dict = {'run_id': '{{ run_id }}'},
                python_callable = some_task_py,
                provide_context = True)

这篇关于我可以以编程方式确定是否安排了Airflow DAG还是手动触发了它?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆