Apache Airflow-获取所有父task_ids [英] Apache Airflow - get all parent task_ids
问题描述
假设以下情况:
[c1, c2, c3] >> child_task
其中所有 c1
, c2
, c3
和 child_task
是运算符,并且具有 task_id
等于 id1
, id2
, id3
和 child_id
。
where all c1
, c2
, c3
and child_task
are operators and have task_id
equal to id1
, id2
, id3
and child_id
respectively.
任务 child_task
也是 PythonOperator
,其中 provide_context = True
和 python_callable = dummy_func
def dummy_func(**context):
#...
是否有可能在 dummy_func
内检索所有父母的身份证(也许
Is it possible to retrieve all parents' ids inside the dummy_func
(perhaps by browsing the dag somehow using the context)?
在这种情况下,预期结果将是列表 ['id1','id2','id3 ']
。
Expected result in this case would be a list ['id1', 'id2', 'id3']
.
推荐答案
upstream_task_ids
和 downstream_task_ids
BaseOperator
的属性仅用于此目的。
The upstream_task_ids
and downstream_task_ids
properties of BaseOperator
are meant just for this purpose.
from typing import List
..
parent_task_ids: List[str] = my_task.upstream_task_ids
child_task_ids: List[str] = my_task_downstream_task_ids
但是请注意,该属性
,您只会获得任务的直接(上游/下游)邻居。为了获得所有祖先或后代 任务
,您可以快速构建好的旧的图论方法例如这样的 BFS
- like 实现
Do note however that with this property
, you only get immediate (upstream / downstream) neighbour(s) of a task. In order to get all ancestor or descendent task
s, you can quickly cook-up the good old graph theory approach such as this BFS
-like implementation
from typing import List, Set
from queue import Queue
from airflow.models import BaseOperator
def get_ancestor_tasks(my_task: BaseOperator) -> List[BaseOperator]:
ancestor_task_ids: Set[str] = set()
tasks_queue: Queue = Queue()
# determine parent tasks to begin BFS
for task in my_task.upstream_list:
tasks_queue.put(item=task)
# perform BFS
while not tasks_queue.empty():
task: BaseOperator = tasks_queue.get()
ancestor_task_ids.add(element=task.task_id)
for _task in task.upstream_list:
tasks_queue.put(item=_task)
# Convert task_ids to actual tasks
ancestor_tasks: List[BaseOperator] = [task for task in my_task.dag.tasks if task.task_id in ancestor_task_ids]
return ancestor_tasks
上面的代码段未经测试,但是我敢肯定您可以从中获得启发
参考
- Get all Airflow Leaf Nodes/Tasks
- Python Queue
- Python 3 type-annotations
这篇关于Apache Airflow-获取所有父task_ids的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!