Apache Airflow-获取所有父task_ids [英] Apache Airflow - get all parent task_ids

查看:225
本文介绍了Apache Airflow-获取所有父task_ids的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设以下情况:

[c1, c2, c3] >> child_task

其中所有 c1 c2 c3 child_task 是运算符,并且具有 task_id 等于 id1 id2 id3 child_id

where all c1, c2, c3 and child_task are operators and have task_id equal to id1, id2, id3 and child_id respectively.

任务 child_task 也是 PythonOperator ,其中 provide_context = True python_callable = dummy_func

def dummy_func(**context):
    #...

是否有可能在 dummy_func 内检索所有父母的身份证(也许

Is it possible to retrieve all parents' ids inside the dummy_func(perhaps by browsing the dag somehow using the context)?

在这种情况下,预期结果将是列表 ['id1','id2','id3 ']

Expected result in this case would be a list ['id1', 'id2', 'id3'].

推荐答案

upstream_task_ids downstream_task_ids BaseOperator 属性仅用于此目的。

The upstream_task_ids and downstream_task_ids properties of BaseOperator are meant just for this purpose.

from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids






但是请注意,该属性,您只会获得任务的直接(上游/下游)邻居。为了获得所有祖先或后代 任务,您可以快速构建好的旧的图论方法例如这样的 BFS - like 实现


Do note however that with this property, you only get immediate (upstream / downstream) neighbour(s) of a task. In order to get all ancestor or descendent tasks, you can quickly cook-up the good old graph theory approach such as this BFS-like implementation

from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator

def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
    ancestor_task_ids: Set[str] = set()
    tasks_queue: Queue = Queue()
    # determine parent tasks to begin BFS
    for task in my_task.upstream_list:
        tasks_queue.put(item=task)
    # perform BFS
    while not tasks_queue.empty():
        task: BaseOperator = tasks_queue.get()
        ancestor_task_ids.add(element=task.task_id)
        for _task in task.upstream_list:
            tasks_queue.put(item=_task)
    # Convert task_ids to actual tasks
    ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
    return ancestor_tasks

上面的代码段未经测试,但是我敢肯定您可以从中获得启发

参考

  • Get all Airflow Leaf Nodes/Tasks
  • Python Queue
  • Python 3 type-annotations

这篇关于Apache Airflow-获取所有父task_ids的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆