具有不同调度程序间隔的气流 ExternalTaskSensor [英] Airflow ExternalTaskSensor with different scheduler interval
问题描述
目前我有两个 DAG:DAG_A 和 DAG_B.两者都使用 schedule_interval=timedelta(days=1)
Currently I have two DAGs: DAG_A and DAG_B. Both runs with schedule_interval=timedelta(days=1)
DAG_A 有一个 Task1,它通常需要 7 个小时才能运行.而 DAG_B 只需要 3 小时.
DAG_A has a Task1 which usually takes 7 hours to run. And DAG_B only takes 3 hours.
DAG_B 有一个 ExternalTaskSensor(external_dag_id=DAG_A", external_task_id=Task1")
,但也使用其他一些每小时生成的信息 X.
DAG_B has a ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1")
but also uses some other information X that is generated hourly.
提高 DAG_B 的频率以使其每天至少运行 4 次的最佳方法是什么?据我所知,两个 DAG 必须具有相同的 schedule_interval.但是,我想尽可能多地更新 DAG_B 上的 X.
What is the best way to increase the frequency of DAG_B so that it runs at least 4 times a day? As far as I know, both DAGs must have the same schedule_interval. However, I want to update X on DAG_B as much as I can.
一种可能性是为 DAG_B 创建另一个具有 ExternalTaskSensor 的 DAG.但我认为这不是最好的方法.
One possibility is to create another DAG that has a ExternalTaskSensor for DAG_B. But I don't think it's the best way.
推荐答案
如果我理解正确,您的条件是:
If I understood you correctly, your conditions are:
- 每天保持运行 DAG_A
- 每天运行 DAG_B n 次
- 每次 DAG_B 运行时,它都会等待 DAG_A__Task_1 完成
- Keep running DAG_A daily
- Run DAG_B n times a day
- Every time DAG_B runs it will wait for DAG_A__Task_1 to be completed
我认为您可以通过指示 ExternalTaskSensor
等待 DAG_A 的所需执行日期来轻松调整您当前的设计.
I think you could easily adapt your current design by instructing ExternalTaskSensor
to wait for the desired execution date of DAG_A.
来自 ExternalTaskSensor 操作符定义:
等待不同 DAG 或不同 DAG 中的任务在特定 execution_date 内完成
Waits for a different DAG or a task in a different DAG to complete for a specific execution_date
execution_date
可以使用 execution_date_fn
参数定义:
That execution_date
could be defined using execution_date_fn
parameter:
execution_date_fn (Optional[Callable]) – 接收当前执行日期作为第一个位置参数和可选的上下文字典中可用的任意数量的关键字参数的函数,并返回所需的执行日期进行查询.execution_delta 或 execution_date_fn 可以传递给 ExternalTaskSensor,但不能同时传递.
execution_date_fn (Optional[Callable]) – function that receives the current execution date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
您可以这样定义传感器:
You could define the sensor like this:
wait_for_dag_a = ExternalTaskSensor(
task_id='wait_for_dag_a',
external_task_id="external_task_1",
external_dag_id='dag_a_id',
allowed_states=['success', 'failed'],
execution_date_fn=_get_execution_date_of_dag_a,
poke_interval=30
)
其中 _get_execution_date_of_dag_a
使用 get_last_dagrun
对数据库执行查询,允许您获取 DAG_A 的最后一个 execution_date
.
Where _get_execution_date_of_dag_a
performs a query to the DB using get_last_dagrun
allowing you to get the last execution_date
of DAG_A.
from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun
@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None, **kwargs):
dag_a_last_run = get_last_dagrun(
'dag_a_id', session)
return dag_a_last_run.execution_date
我希望这种方法能帮到你.您可以在这个答案中找到一个工作示例.
I hope this approach helps you out. You can find a working example in this answer.
这篇关于具有不同调度程序间隔的气流 ExternalTaskSensor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!