我可以以编程方式确定是否安排了Airflow DAG还是手动触发了它? [英] Can I programmatically determine if an Airflow DAG was scheduled or manually triggered?
问题描述
我想创建一个片段,该片段基于DAG是计划的还是手动触发的来传递正确的日期。 DAG每月运行一次。 DAG根据上个月的数据生成报告(SQL查询)。
I want to create a snippet that passes the correct date based on whether the DAG was scheduled or whether it was triggered manually. The DAG runs monthly. The DAG generates a report (A SQL query) based on the data of the previous month.
如果我按计划运行DAG,则可以使用以下命令获取上个月的数据: jinja片段:
If I run the DAG scheduled, I can fetch the previous month with the following jinja snippet:
execution_date.month
鉴于DAG计划在上一个周期(上个月)结束时,execution_date将正确返回上个月。但是,在手动运行时,它将返回当前月份(执行日期将是手动触发的日期)。
given that the DAG is scheduled at the end of the previous period (last month) the execution_date will correctly return the last month. However on manual runs this will return the current month (execution date will be the date of the manual trigger).
我想编写一个处理这种情况的简单宏。但是,我找不到以编程方式查询DAG是否以编程方式触发的好方法。我能想到的最好的办法是从数据库中获取 run_id
(通过创建具有数据库会话的宏),然后检查 run_id
包含单词 manual
。有没有更好的方法来解决此问题?
I want to write a simple macro that deals with this case. However I could not find a good way to programmatically query whether the DAG is triggered programmatically. The best I could come up with is to fetch the run_id
from the database (by creating a macro that has a DB session), check wheter the run_id
contains the word manual
. Is there a better way to solve this problem?
推荐答案
目前没有直接的DAG属性可以识别手动运行。
要获取此信息,您需要检查您提到的 run_id
。
There is no direct DAG property to identify manual runs for now.
To get this information you would need to check the run_id
as you mentioned.
但是,有一个专用宏可以获取 run_id
。您不必自己从数据库中获取数据。
以下是如何使用它的示例:
However, there is a dedicated macro get the run_id
. You don't have to fetch it from the database by yourself.
Here is an example on how to use it :
def some_task_py(**context):
run_id = context['templates_dict']['run_id']
is_manual = run_id.startswith('manual__')
is_scheduled = run_id.startswith('scheduled__')
some_task = PythonOperator(
task_id = 'some_task',
dag=dag,
templates_dict = {'run_id': '{{ run_id }}'},
python_callable = some_task_py,
provide_context = True)
这篇关于我可以以编程方式确定是否安排了Airflow DAG还是手动触发了它?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!