有条件地一一执行多个分支 [英] Conditionally execute multiple branches one by one
问题描述
注意
- 请仔细阅读并理解问题
- 不能通过简单的
BranchPythonOperator
/ShortCircuitOperator
我们的工作流程中有一个不寻常的多路复用器类用例
We have an unusual multiplexer-like use-case in our workflow
+-----------------------+
| |
+------------>+ branch-1.begin-task |
| | |
| +-----------------------+
|
|
| +-----------------------+
| | |
+------------>+ branch-2.begin-task |
| | |
+------------+ | +-----------------------+
| | |
| MUX-task +----+ +
| | | |
+------------+ |
| |
+- -- -- -- ->
| |
|
| |
| +
|
| +-----------------------+
| | |
+------------>+ branch-n.begin-task |
| |
+-----------------------+
<小时>
流程预计如下
The flow is expected to work as follows
MUX-task
监听外部队列上的事件(单队列)- 队列中的每个事件都会触发其中一个分支(branch-n.begin-task)的执行
- 一个接一个,当事件到达时,MUX 任务必须触发相应分支的执行
- 一旦所有分支都被触发,MUX 任务就完成了
MUX-task
listens for events on an external queue (single queue)- each event on queue triggers execution of one of the branches (branch-n.begin-task)
- one-by-one, as events arrive, the MUX-task must trigger execution of respective branch
- once all branches have been triggered, the MUX-task completes
假设
- 正好
n
个事件到达队列,一个用于触发每个分支 n
是动态已知的:它的值在变量
- Exactly
n
events arrive on queue, one for triggering each branch n
is dynamically-known: it's value is defined in aVariable
限制
- 事件到达的外部队列只有一个
- 我们不能有
n
个队列(每个分支一个),因为分支随时间增长(n 是动态定义的)
- The external queue where events arrive is only one
- we can't have
n
queues (one per branch) since branches grow with time (n is dynamically defined)
我们无法在 Airflow 的一组 operators 中提出解决方案和传感器(或Airflow
中可用的任何类似的东西)来构建这个
We are not able to come up with a solution within Airflow's set of operators and sensors (or any such thing available out-of-the-hood in Airflow
) to build this
Sensor
s 可用于监听外部队列上的事件;但我们必须监听多个事件,而不是一个BranchPythonOperator
可用于触发执行多个分支中的单个分支,但它立即将剩余的分支标记为已跳过
Sensor
s can be used for listening events on external queue; but we have to listen for multiple events, not oneBranchPythonOperator
can be used to trigger execution of a single branch out of many, but it immediately marks remaining branches as skipped
主要瓶颈
由于上述第二个限制,即使是结合 Sensor
和 BranchPythonOperator
功能的自定义运算符也无法工作.
Because of the 2nd limitation above, even a custom-operator combining functionality of a Sensor
and BranchPythonOperator
won't work.
我们试图围绕 Sensors
、DummyOperator
和 trigger_rules
来实现这一点,但到目前为止还没有成功.
We have tried to brainstorm around a fancy combination of Sensors
, DummyOperator
and trigger_rules
to achieve this, but have had no success thus far.
这在 Airflow 中可行吗?
Is this doable in Airflow?
UPDATE-1
这里有一些背景信息来了解工作流的上下文
Here's some background info to understand the context of workflow
- 我们有一个 ETL 管道来将
MySQL
表(跨多个Aurora
数据库)同步到我们的数据湖 - 为了克服我们的同步管道对生产数据库的影响,我们决定这样做
- 为每个数据库创建一个快照(从上次备份恢复
AuroraDB
集群) - 使用该快照 运行
- 在同步结束时,终止快照(
AuroraDB
集群)
MySQL
同步管道- we have an ETL pipeline to sync
MySQL
tables (across multipleAurora
databases) to our data-lake - to overcome the impact of our sync pipeline on production databases, we have decided to do this
- for each database, create a snapshot (restore
AuroraDB
cluster from last backup) - run
MySQL
sync pipeline using that snapshot - at then end of sync, terminate the snapshot (
AuroraDB
cluster)
- 所有数据库的单一队列
- 此设置由我们的 DevOps 团队完成(不同的 AWS 帐户,我们无权访问底层
Lambda
s/SQS
/infra)
- single queue for all databases
- this setup was done by our DevOps team (different AWS account, we don't have access to the underlying
Lambda
s /SQS
/ infra)
推荐答案
XCOM
来救援!我们决定对任务建模如下(两个任务都是custom
operator
s)We decided to model the tasks as follows (both tasks are custom
operator
s)MUX-task
更像是一个迭代传感器
:它不断监听队列中的事件并对每个事件采取一些行动排队等候- 所有
branch-x.begin-task
都是简单传感器:它们监听XCOM
(谁的名字在预定义的特定格式)
- The
MUX-task
is more like an iterative-sensor
: it keeps listening for events on queue and takes some action against each event arriving on queue - All
branch-x.begin-task
s are simple sensors: they listen for publishing of anXCOM
(who's name is in a pre-defined specific format)
工作流程如下
MUX-task
侦听队列上的事件(侦听部分包含在for
-loop 中,迭代次数与数字相同分行)- 当一个事件到达时,
MUX-task
会接收它;它确定应该触发哪个分支"并为相应的分支发布XCOM
- 各自分支的
sensor
在下一次poke时接收到XCOM
,然后分支开始执行.实际上,branch 的sensor
仅仅充当了一个网关,它向外部事件 (XCOM
) 开放并允许分支的执行
- The
MUX-task
listens for events on queue (listening part is enclosed in afor
-loop with as many iterations as the number of branches) - When an event arrives, the
MUX-task
picks it up; it identifies which 'branch' should be triggered and publishes anXCOM
for the respective branch - The respective branch's
sensor
picks up thatXCOM
on it's next poke and the branch starts executing. In effect, branch'ssensor
merely acts as a gateway that opens up with an external event (XCOM
) and allows execution of branch
由于传感器太多(每个分支一个),我们很可能是 采用
mode='reschedule'
来克服死锁一个>Since there are too many sensors (one per branch), we would most likely be employing
mode='reschedule'
to overcome deadlocks- 由于所描述的方法在很大程度上依赖于轮询,因此我们认为它的效率并不高.
- 基于反应触发的方法会更受欢迎,但我们还没有解决
- Since the described approach relies heavily on polling, we don't deem it to be super efficient.
- A reactive triggering based approach would be more desirable, but we haven't been able to work it out
UPDATE-1
- 如果我们可以将每个分支建模为一个单独的
DAG
,而不是为每个分支发布XCOM
分支,触发分支的DAG
就像TriggerDagRunOperator
一样 - 但是由于我们的整体
DAG
是通过复杂的逻辑以编程方式生成的,因此这种更改将非常困难(大量代码重写).因此,我们决定继续使用基于民意调查的方法,并在已经需要几个小时才能完成的管道中忍受几分钟的额外延迟
- Looks like 'reactive' approach is achievable if we could model each branch as a separate
DAG
and instead of publishingXCOM
s for each branch, trigger the branch'sDAG
just likeTriggerDagRunOperator
does - But since our monolithic
DAG
is generated programmatically via complex logic, this change would have been quite hard (lots of code rewrite). So we decided to continue with the poll-based approach and live with few minutes of extra delay in a pipeline that already takes several hours to complete
UPDATE-2
[参考问题的UPDATE-1部分]
由于我们的实际实现只需要等待数据库的创建,我们决定将工作流程简化如下
Since our actual implementation required us to just wait for creation of database, we decided to simplify the workflow as follows
- 通过
DNS
修复了数据库端点(它们不会在每次Aurora
快照恢复时更改) - 我们取消了
MUX-task
(以及Aurora 恢复生命周期事件的SQS
队列) - 每个分支的开始任务
branch-x.begin-task
都被建模为一个简单的sensor
,它试图触发一个假人 SQL 查询 (SELECT 1
) 以检查数据库端点是否已激活
- database endpoints were fixed via
DNS
(they didn't change every timeAurora
snapshot was restored) - we did away with the
MUX-task
(and so also theSQS
queue for Aurora restore lifecycle events) - each branch's begin-task
branch-x.begin-task
was modelled as a simplesensor
that tried firing a dummy SQL query (SELECT 1
) to check if database endpoint has become active or not
这篇关于有条件地一一执行多个分支的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
- for each database, create a snapshot (restore
- 为每个数据库创建一个快照(从上次备份恢复