将顶级DAG连接在一起 [英] Wiring top-level DAGs together
问题描述
我需要有几个完全相同的(仅在参数上有所不同)顶级 DAG
s ,也可以与以下约束一起触发/假设:
- 每个顶级DAG都会具有
schedule_interval = None
只需要偶尔手动触发 - 一系列DAG,但是,需要每天运行
- Order < DAG的/ em>和 number 是固定的(在编写代码之前已知),很少更改(几个月内一次)
- 无论DAG失败或成功,触发链一定不能中断
- 当前它们必须串联在一起运行;将来他们可能需要并行触发
所以我为每个DAG创建了一个文件我的 dags
目录,现在我必须连接它们以便顺序执行。我确定了两种方法可以完成此操作:
-
SubDagOperator
- 在
注意:在此示例中,顶级DAG被命名为作为importer_child_v1_db_X
及其对应的task_id
s(对于TriggerDagRunOperator
)被命名为importer_v1_db_X
是否有可能TriggerDagRunOperator是DAG中
的最后一个任务吗?
我必须链接几个类似的DAG(仅在参数上有所不同) 工作流一起触发他们。因此,我最后不能只输入一个
TriggerDagRunOperator
(这里有3个,但在生产中最多有15个)解决方案从 @Viraj Parekh 的答案,我能够按预期的方式使
TriggerDagRunOperator
工作。我特此发布我的(部分)答案;
如何克服<$ c的局限性$ b的
SubDag
s的dag_id
中的$ c> parent_id 前缀?
正如@Viraj所说,没有直接的方法可以实现这一目标。扩展
SubDagOperator
以删除此检查可能有效,但我决定避免使用此检查
如何强制
TriggerDagRunOperator
s 等待DAG完成?
-
查看实现,很明显,
TriggerDagRunOperator
的工作只是触发外部DAG;就是这样。默认情况下,它不是应该等待DAG完成。因此,我观察到的行为是可以理解的。 -
ExternalTaskSensor
是显而易见的出路。但是,在学习Airflow
的基础知识时,我依靠的是DAG的手动触发(schedule_interval = None
)。在这种情况下,ExternalTaskSensor
使得很难为外部任务准确地指定execution_date
(正在等待其完成),导致传感器卡住。 -
实现的提示,我做了对
ExternalTaskSensor $ c的行为进行细微调整
$ c>,等待相关任务的所有task_instance
s 完成
execution_date [external_task]> = execution_date [TriggerDagRunOperator] + execution_delta
这实现了:外部DAG依次运行。
对于每个创建创建单独文件
的方法(每个DAG仅在输入上有所不同),是否有解决方法?顶级DAG?
再次通过@Viraj可以通过使用<$ c将DAG分配到全局范围 $ c> globals()[dag_id] = DAG(..)
EDIT-1
也许我指的是不正确的资源(链接已经失效),但
>ExternalTaskSe nsor
已包含参数execution_delta
&execution_date_fn
可以轻松地限制execution_date 感测任务的执行范围。
I need to have several identical (differing only in arguments) top-level
DAG
s that can also be triggered together with following constraints / assumptions:- Individual top-level DAGs will have
schedule_interval=None
as they will only need occasional manual triggering - The series of DAGs, however, needs to run daily
- Order and number of DAGs in series is fixed (known ahead of writing code) and changes rarely (once in a few months)
- Irrespective of whether a DAG fails or succeeds, the chain of triggering must not break
- Currently they must be run together in series; in future they may require parallel triggering
So I created one file for each DAG in my
dags
directory and now I must wire them up for sequential execution. I have identified two ways this could be done:SubDagOperator
- Works without a glitch in my demo
- Can lead to deadlocks but there are easy solutions; still there's a lot of haze around using them
- SubDag's
dag_id
must be prefixed by it's parent's, that would force absurd IDs on top-level DAGs that are supposed to be functional independently too
TriggerDagRunOperator
- Works in my demo but runs in parallel (not sequentially) as it doesn't wait for triggered DAG to finish before moving onto next one
ExternalTaskSensor
might help overcome above limitation but it would make things very messy
My questions are
- How to overcome limitation of
parent_id
prefix indag_id
ofSubDag
s? - How to force
TriggerDagRunOperator
s to await completion of DAG? - Any alternate / better way to wire-up independent (top-level) DAGs together?
- Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?
I'm using puckel/docker-airflow with
Airflow 1.9.0-4
Python 3.6-slim
CeleryExecutor
withredis:3.2.7
EDIT-1
Clarifying @Viraj Parekh's queries
Can you give some more detail on what you mean by awaiting completion of the DAG before getting triggered?
When I trigger the
import_parent_v1
DAG, all the 3 external DAGs that it is supposed to fire usingTriggerDagRunOperator
start running parallely even when I chain them sequentially. Actually the logs indicate that while they are fired one-after another, the execution moves onto next DAG (TriggerDagRunOperator
) before the previous one has finished. NOTE: In this example, the top-level DAGs are named asimporter_child_v1_db_X
and their correspondingtask_id
s (forTriggerDagRunOperator
) are named asimporter_v1_db_X
Would it be possible to just have the TriggerDagRunOperator be the last task in a DAG?
I have to chain several similar (differing only in arguments) DAGs together in a workflow that triggers them one-by-one. So there isn't just one
TriggerDagRunOperator
that I could put at last, there are many (here 3, but would be upto 15 in production)解决方案Taking hints from @Viraj Parekh's answer, I was able to make
TriggerDagRunOperator
work in the intended fashion. I'm hereby posting my (partial) answer; will update as and when things become clear.
How to overcome limitation of
parent_id
prefix indag_id
ofSubDag
s?As told @Viraj, there's no straight way of achieving this. Extending
SubDagOperator
to remove this check might work but I decided to steer clear of it
How to force
TriggerDagRunOperator
s to await completion of DAG?Looking at the implementation, it becomes clear that the job of
TriggerDagRunOperator
is just to trigger external DAG; and that's about it. By default, it is not supposed to wait for completion of DAG. Therefore the behaviour I'm observing is understandable.ExternalTaskSensor
is the obvious way out. However while learning basics ofAirflow
I was relying on manual triggering of DAGs (schedule_interval=None
). In such case,ExternalTaskSensor
makes it difficult to accurately specifyexecution_date
for the external task (who's completion is being awaited), failing which the sensor gets stuck.So taking hint from implementation, I made minor adjustment to behaviour of
ExternalTaskSensor
by awaiting completion of alltask_instance
s of concerned task havingexecution_date[external_task] >= execution_date[TriggerDagRunOperator] + execution_delta
This achieves the desired result: external DAGs run one-after-other in sequence.
Is there a workaround for my approach of creating separate files (for DAGs that differ only in input) for each top-level DAG?
Again going by @Viraj this can be done by assigning DAGs to global scope using
globals()[dag_id] = DAG(..)
EDIT-1
Maybe I was referring to incorrect resource (the link above is already dead), but
ExternalTaskSensor
already includes the paramsexecution_delta
&execution_date_fn
to easily restrictexecution_date
(s) for the task being sensed.这篇关于将顶级DAG连接在一起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
-
- 在