具有不同调度程序间隔的气流 ExternalTask​​Sensor [英] Airflow ExternalTaskSensor with different scheduler interval

查看:44
本文介绍了具有不同调度程序间隔的气流 ExternalTask​​Sensor的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

目前我有两个 DAG:DAG_A 和 DAG_B.两者都使用 schedule_interval=timedelta(days=1)

Currently I have two DAGs: DAG_A and DAG_B. Both runs with schedule_interval=timedelta(days=1)

DAG_A 有一个 Task1,它通常需要 7 个小时才能运行.而 DAG_B 只需要 3 小时.

DAG_A has a Task1 which usually takes 7 hours to run. And DAG_B only takes 3 hours.

DAG_B 有一个 ExternalTask​​Sensor(external_dag_id=DAG_A", external_task_id=Task1"),但也使用其他一些每小时生成的信息 X.

DAG_B has a ExternalTaskSensor(external_dag_id="DAG_A", external_task_id="Task1") but also uses some other information X that is generated hourly.

提高 DAG_B 的频率以使其每天至少运行 4 次的最佳方法是什么?据我所知,两个 DAG 必须具有相同的 schedule_interval.但是,我想尽可能多地更新 DAG_B 上的 X.

What is the best way to increase the frequency of DAG_B so that it runs at least 4 times a day? As far as I know, both DAGs must have the same schedule_interval. However, I want to update X on DAG_B as much as I can.

一种可能性是为 DAG_B 创建另一个具有 ExternalTask​​Sensor 的 DAG.但我认为这不是最好的方法.

One possibility is to create another DAG that has a ExternalTaskSensor for DAG_B. But I don't think it's the best way.

推荐答案

如果我理解正确,您的条件是:

If I understood you correctly, your conditions are:

  • 每天保持运行 DAG_A
  • 每天运行 DAG_B n
  • 每次 DAG_B 运行时,它都会等待 DAG_A__Task_1 完成
  • Keep running DAG_A daily
  • Run DAG_B n times a day
  • Every time DAG_B runs it will wait for DAG_A__Task_1 to be completed

我认为您可以通过指示 ExternalTask​​Sensor 等待 DAG_A 的所需执行日期来轻松调整您当前的设计.

I think you could easily adapt your current design by instructing ExternalTaskSensor to wait for the desired execution date of DAG_A.

来自 ExternalTask​​Sensor 操作符定义:

等待不同 DAG 或不同 DAG 中的任务在特定 execution_date 内完成

Waits for a different DAG or a task in a different DAG to complete for a specific execution_date

execution_date 可以使用 execution_date_fn 参数定义:

That execution_date could be defined using execution_date_fn parameter:

execution_date_fn (Optional[Callable]) – 接收当前执行日期作为第一个位置参数和可选的上下文字典中可用的任意数量的关键字参数的函数,并返回所需的执行日期进行查询.execution_delta 或 execution_date_fn 可以传递给 ExternalTask​​Sensor,但不能同时传递.

execution_date_fn (Optional[Callable]) – function that receives the current execution date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

您可以这样定义传感器:

You could define the sensor like this:

    wait_for_dag_a = ExternalTaskSensor(
        task_id='wait_for_dag_a',
        external_task_id="external_task_1",
        external_dag_id='dag_a_id',
        allowed_states=['success', 'failed'],
        execution_date_fn=_get_execution_date_of_dag_a,
        poke_interval=30
    )

其中 _get_execution_date_of_dag_a 使用 get_last_dagrun 对数据库执行查询,允许您获取 DAG_A 的最后一个 execution_date.

Where _get_execution_date_of_dag_a performs a query to the DB using get_last_dagrun allowing you to get the last execution_date of DAG_A.

from airflow.utils.db import provide_session
from airflow.models.dag import get_last_dagrun

@provide_session
def _get_execution_date_of_dag_a(exec_date, session=None,  **kwargs):
    dag_a_last_run = get_last_dagrun(
        'dag_a_id', session)
    return dag_a_last_run.execution_date

我希望这种方法能帮到你.您可以在这个答案中找到一个工作示例.

I hope this approach helps you out. You can find a working example in this answer.

这篇关于具有不同调度程序间隔的气流 ExternalTask​​Sensor的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆