将顶级DAG连接在一起 [英] Wiring top-level DAGs together

查看:167
本文介绍了将顶级DAG连接在一起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要有几个完全相同的(仅在参数上有所不同)顶级 DAG s ,也可以与以下约束一起触发/假设:




  • 每个顶级DAG都会具有 schedule_interval = None 只需要偶尔手动触发

  • 一系列DAG,但是,需要每天运行

  • Order < DAG的/ em>和 number 是固定的(在编写代码之前已知),很少更改(几个月内一次)

  • 无论DAG失败或成功,触发链一定不能中断

  • 当前它们必须串联在一起运行;将来他们可能需要并行触发






所以我为每个DAG创建了一个文件我的 dags 目录,现在我必须连接它们以便顺序执行。我确定了两种方法可以完成此操作:


  1. SubDagOperator






    • 注意:在此示例中,顶级DAG被命名为作为 importer_child_v1_db_X 及其对应的 task_id s(对于 TriggerDagRunOperator )被命名为 importer_v1_db_X


      是否有可能TriggerDagRunOperator是DAG中
      的最后一个任务吗?


      我必须链接几个类似的DAG(仅在参数上有所不同) 工作流一起触发他们。因此,我最后不能只输入一个 TriggerDagRunOperator (这里有3个,但在生产中最多有15个)

      解决方案

      @Viraj Parekh 答案,我能够按预期的方式使 TriggerDagRunOperator 工作。我特此发布我的(部分)答案;





      如何克服<$ c的局限性$ b的 SubDag s的 dag_id 中的$ c> parent_id 前缀?


      正如@Viraj所说,没有直接的方法可以实现这一目标。扩展 SubDagOperator 以删除此检查可能有效,但我决定避免使用此检查







      如何强制 TriggerDagRunOperator s 等待DAG完成










      对于每个创建创建单独文件
      的方法(每个DAG仅在输入上有所不同),是否有解决方法?顶级DAG?


      再次通过@Viraj可以通过使用<$ c将DAG分配到全局范围 $ c> globals()[dag_id] = DAG(..)






      EDIT-1



      也许我指的是不正确的资源(链接已经失效),但 ExternalTask​​Se nsor 已包含参数 execution_delta & execution_date_fn 可以轻松地限制 execution_date 感测任务的执行范围。

      >

      I need to have several identical (differing only in arguments) top-level DAGs that can also be triggered together with following constraints / assumptions:

      • Individual top-level DAGs will have schedule_interval=None as they will only need occasional manual triggering
      • The series of DAGs, however, needs to run daily
      • Order and number of DAGs in series is fixed (known ahead of writing code) and changes rarely (once in a few months)
      • Irrespective of whether a DAG fails or succeeds, the chain of triggering must not break
      • Currently they must be run together in series; in future they may require parallel triggering

      So I created one file for each DAG in my dags directory and now I must wire them up for sequential execution. I have identified two ways this could be done:

      1. SubDagOperator

      2. TriggerDagRunOperator

        • Works in my demo but runs in parallel (not sequentially) as it doesn't wait for triggered DAG to finish before moving onto next one
        • ExternalTaskSensor might help overcome above limitation but it would make things very messy


      My questions are

      • How to overcome limitation of parent_id prefix in dag_id of SubDags?
      • How to force TriggerDagRunOperators to await completion of DAG?
      • Any alternate / better way to wire-up independent (top-level) DAGs together?
      • Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?

      I'm using puckel/docker-airflow with

      • Airflow 1.9.0-4
      • Python 3.6-slim
      • CeleryExecutor with redis:3.2.7

      EDIT-1

      Clarifying @Viraj Parekh's queries

      Can you give some more detail on what you mean by awaiting completion of the DAG before getting triggered?

      When I trigger the import_parent_v1 DAG, all the 3 external DAGs that it is supposed to fire using TriggerDagRunOperator start running parallely even when I chain them sequentially. Actually the logs indicate that while they are fired one-after another, the execution moves onto next DAG (TriggerDagRunOperator) before the previous one has finished. NOTE: In this example, the top-level DAGs are named as importer_child_v1_db_X and their corresponding task_ids (for TriggerDagRunOperator) are named as importer_v1_db_X

      Would it be possible to just have the TriggerDagRunOperator be the last task in a DAG?

      I have to chain several similar (differing only in arguments) DAGs together in a workflow that triggers them one-by-one. So there isn't just one TriggerDagRunOperator that I could put at last, there are many (here 3, but would be upto 15 in production)

      解决方案

      Taking hints from @Viraj Parekh's answer, I was able to make TriggerDagRunOperator work in the intended fashion. I'm hereby posting my (partial) answer; will update as and when things become clear.


      How to overcome limitation of parent_id prefix in dag_id of SubDags?

      As told @Viraj, there's no straight way of achieving this. Extending SubDagOperator to remove this check might work but I decided to steer clear of it


      How to force TriggerDagRunOperators to await completion of DAG?

      • Looking at the implementation, it becomes clear that the job of TriggerDagRunOperator is just to trigger external DAG; and that's about it. By default, it is not supposed to wait for completion of DAG. Therefore the behaviour I'm observing is understandable.

      • ExternalTaskSensor is the obvious way out. However while learning basics of Airflow I was relying on manual triggering of DAGs (schedule_interval=None). In such case, ExternalTaskSensor makes it difficult to accurately specify execution_date for the external task (who's completion is being awaited), failing which the sensor gets stuck.

      • So taking hint from implementation, I made minor adjustment to behaviour of ExternalTaskSensor by awaiting completion of all task_instances of concerned task having

        execution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta

        This achieves the desired result: external DAGs run one-after-other in sequence.


      Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?

      Again going by @Viraj this can be done by assigning DAGs to global scope using globals()[dag_id] = DAG(..)


      EDIT-1

      Maybe I was referring to incorrect resource (the link above is already dead), but ExternalTaskSensor already includes the params execution_delta & execution_date_fn to easily restrict execution_date(s) for the task being sensed.

      这篇关于将顶级DAG连接在一起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆