有条件地一一执行多个分支 [英] Conditionally execute multiple branches one by one

查看:35
本文介绍了有条件地一一执行多个分支的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

注意

我们的工作流程中有一个不寻常的多路复用器类用例

We have an unusual multiplexer-like use-case in our workflow

                                +-----------------------+
                                |                       |
                  +------------>+  branch-1.begin-task  |
                  |             |                       |
                  |             +-----------------------+
                  |
                  |
                  |             +-----------------------+
                  |             |                       |
                  +------------>+  branch-2.begin-task  |
                  |             |                       |
+------------+    |             +-----------------------+
|            |    |
|  MUX-task  +----+                         +
|            |    |                         |
+------------+    |
                  |                         |
                  +- -- -- -- ->
                  |                         |
                  |
                  |                         |
                  |                         +
                  |
                  |             +-----------------------+
                  |             |                       |
                  +------------>+  branch-n.begin-task  |
                                |                       |
                                +-----------------------+

<小时>

流程预计如下


The flow is expected to work as follows

  • MUX-task 监听外部队列上的事件(单队列)
  • 队列中的每个事件都会触发其中一个分支(branch-n.begin-task)的执行
  • 一个接一个,当事件到达时,MUX 任务必须触发相应分支的执行
  • 一旦所有分支都被触发,MUX 任务就完成了
  • MUX-task listens for events on an external queue (single queue)
  • each event on queue triggers execution of one of the branches (branch-n.begin-task)
  • one-by-one, as events arrive, the MUX-task must trigger execution of respective branch
  • once all branches have been triggered, the MUX-task completes

假设

  • 正好 n 个事件到达队列,一个用于触发每个分支
  • n动态已知的:它的值在 变量
  • Exactly n events arrive on queue, one for triggering each branch
  • n is dynamically-known: it's value is defined in a Variable

限制

  • 事件到达的外部队列只有一个
  • 我们不能有 n 个队列(每个分支一个),因为分支随时间增长(n 是动态定义的)
  • The external queue where events arrive is only one
  • we can't have n queues (one per branch) since branches grow with time (n is dynamically defined)

我们无法在 Airflow 的一组 operators 中提出解决方案和传感器(或Airflow中可用的任何类似的东西)来构建这个

We are not able to come up with a solution within Airflow's set of operators and sensors (or any such thing available out-of-the-hood in Airflow) to build this

  1. Sensors 可用于监听外部队列上的事件;但我们必须监听多个事件,而不是一个
  2. BranchPythonOperator 可用于触发执行多个分支中的单个分支,但它立即将剩余的分支标记为已跳过
  1. Sensors can be used for listening events on external queue; but we have to listen for multiple events, not one
  2. BranchPythonOperator can be used to trigger execution of a single branch out of many, but it immediately marks remaining branches as skipped

主要瓶颈

由于上述第二个限制,即使是结合 SensorBranchPythonOperator 功能的自定义运算符也无法工作.

Because of the 2nd limitation above, even a custom-operator combining functionality of a Sensor and BranchPythonOperator won't work.

我们试图围绕 SensorsDummyOperatortrigger_rules 来实现这一点,但到目前为止还没有成功.

We have tried to brainstorm around a fancy combination of Sensors, DummyOperator and trigger_rules to achieve this, but have had no success thus far.

这在 Airflow 中可行吗?

Is this doable in Airflow?

UPDATE-1

这里有一些背景信息来了解工作流的上下文

Here's some background info to understand the context of workflow

  • 我们有一个 ETL 管道来将 MySQL 表(跨多个 Aurora 数据库)同步到我们的数据湖
  • 为了克服我们的同步管道对生产数据库的影响,我们决定这样做
    • we have an ETL pipeline to sync MySQL tables (across multiple Aurora databases) to our data-lake
    • to overcome the impact of our sync pipeline on production databases, we have decided to do this
      • for each database, create a snapshot (restore AuroraDB cluster from last backup)
      • run MySQL sync pipeline using that snapshot
      • at then end of sync, terminate the snapshot (AuroraDB cluster)
      • 所有数据库的单一队列
      • 此设置由我们的 DevOps 团队完成(不同的 AWS 帐户,我们无权访问底层 Lambdas/SQS/infra)
      • single queue for all databases
      • this setup was done by our DevOps team (different AWS account, we don't have access to the underlying Lambdas / SQS / infra)

      推荐答案

      XCOM 来救援!

      我们决定对任务建模如下(两个任务都是custom operators)

      We decided to model the tasks as follows (both tasks are custom operators)

      • MUX-task 更像是一个迭代传感器:它不断监听队列中的事件并对每个事件采取一些行动排队等候
      • 所有branch-x.begin-task都是简单传感器:它们监听XCOM(谁的名字在预定义的特定格式)
      • The MUX-task is more like an iterative-sensor: it keeps listening for events on queue and takes some action against each event arriving on queue
      • All branch-x.begin-tasks are simple sensors: they listen for publishing of an XCOM (who's name is in a pre-defined specific format)

      工作流程如下

      • MUX-task 侦听队列上的事件(侦听部分包含在 for-loop 中,迭代次数与数字相同分行)
      • 当一个事件到达时,MUX-task 会接收它;它确定应该触发哪个分支"并为相应的分支发布XCOM
      • 各自分支的sensor在下一次poke时接收到XCOM,然后分支开始执行.实际上,branch 的 sensor 仅仅充当了一个网关,它向外部事件 (XCOM) 开放并允许分支的执行
      • The MUX-task listens for events on queue (listening part is enclosed in a for-loop with as many iterations as the number of branches)
      • When an event arrives, the MUX-task picks it up; it identifies which 'branch' should be triggered and publishes an XCOM for the respective branch
      • The respective branch's sensor picks up that XCOM on it's next poke and the branch starts executing. In effect, branch's sensor merely acts as a gateway that opens up with an external event (XCOM) and allows execution of branch

      由于传感器太多(每个分支一个),我们很可能是 采用mode='reschedule' 来克服死锁一个>

      Since there are too many sensors (one per branch), we would most likely be employing mode='reschedule' to overcome deadlocks

      • 由于所描述的方法在很大程度上依赖于轮询,因此我们认为它的效率并不高.
      • 基于反应触发的方法会更受欢迎,但我们还没有解决
      • Since the described approach relies heavily on polling, we don't deem it to be super efficient.
      • A reactive triggering based approach would be more desirable, but we haven't been able to work it out

      UPDATE-1

      • 如果我们可以将每个分支建模为一个单独的DAG,而不是为每个分支发布XCOM分支,触发分支的 DAG 就像 TriggerDagRunOperator 一样
      • 但是由于我们的整体DAG是通过复杂的逻辑以编程方式生成的,因此这种更改将非常困难(大量代码重写).因此,我们决定继续使用基于民意调查的方法,并在已经需要几个小时才能完成的管道中忍受几分钟的额外延迟
      • Looks like 'reactive' approach is achievable if we could model each branch as a separate DAG and instead of publishing XCOMs for each branch, trigger the branch's DAG just like TriggerDagRunOperator does
      • But since our monolithic DAG is generated programmatically via complex logic, this change would have been quite hard (lots of code rewrite). So we decided to continue with the poll-based approach and live with few minutes of extra delay in a pipeline that already takes several hours to complete

      UPDATE-2

      [参考问题的UPDATE-1部分]

      由于我们的实际实现只需要等待数据库的创建,我们决定将工作流程简化如下

      Since our actual implementation required us to just wait for creation of database, we decided to simplify the workflow as follows

      • 通过DNS 修复了数据库端点(它们不会在每次Aurora 快照恢复时更改)
      • 我们取消了MUX-task(以及Aurora 恢复生命周期事件SQS 队列)
      • 每个分支的开始任务branch-x.begin-task 都被建模为一个简单的sensor,它试图触发一个假人 SQL 查询 (SELECT 1) 以检查数据库端点是否已激活
      • database endpoints were fixed via DNS (they didn't change every time Aurora snapshot was restored)
      • we did away with the MUX-task (and so also the SQS queue for Aurora restore lifecycle events)
      • each branch's begin-task branch-x.begin-task was modelled as a simple sensor that tried firing a dummy SQL query (SELECT 1) to check if database endpoint has become active or not

      这篇关于有条件地一一执行多个分支的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆