气流:如何获取一项任务的返回输出以设置要运行的下游任务的依存关系? [英] Airflow: How to get the return output of one task to set the dependencies of the downstream tasks to run?

查看:166
本文介绍了气流:如何获取一项任务的返回输出以设置要运行的下游任务的依存关系?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们有一个kubernetes pod运算符,它将吐出一个python字典,该字典将定义要运行的其他下游kubernetes pod运算符以及它们的依赖项和环境变量,并将其传递给每个运算符。

We have a kubernetes pod operator that will spit out a python dictionary that will define which further downstream kubernetes pod operators to run along with their dependencies and the environment variables to pass into each operator.

如何将这个python字典对象重新带回执行者的上下文(或者它是工作人员的上下文?),以便气流可以产生下游kubernetes运算符?

How do I get this python dictionary object back into the executor's context (or is it worker's context?) so that airflow can spawn the downstream kubernetes operators?

我看过BranchOperator和TriggerDagRunOperator以及XCOM push / pull和Variable.get和Variable.set,但是似乎没有什么用。

I've looked at BranchOperator and TriggerDagRunOperator and XCOM push/pull and Variable.get and Variable.set, but nothing seems to quite work.

推荐答案


我们有一个kubernetes pod运算符,它将吐出一个python
字典,该字典将定义要运行的下游下游kubernetes pod
运算符

We have a kubernetes pod operator that will spit out a python dictionary that will define which further downstream kubernetes pod operators to run

这是可能的,尽管不是您尝试的方式。您必须拥有所有可能的 KubernetesPodOperator s 已经在您的工作流程中,然后跳过那些不需要运行的

This is possible, albeit not in the way you are trying. You'll have to have all possible KubernetesPodOperators already in your workflow and then skip those that need not be run.

执行此操作的一种优雅方法是附加 ShortCircuitOperator 在每个 KubernetesPodOperator 读取 XCom 字典),由上游 KubernetesPodOperator 并确定是否继续执行下游任务。

An elegant way to do this would be to attach a ShortCircuitOperator before each KubernetesPodOperator that reads the XCom (dictionary) published by the upstream KubernetesPodOperator and determines whether or not to continue with the downstream task.

EDIT-1

实际上,更干净的方法是在您要跳过的任务内提高 AirflowSkipException (而不是使用单独的 ShortCircuitOperator 来完成此操作)

Actually a cleaner way would be to just raise an AirflowSkipException within the task that you want to skip (rather than using a separate ShortCircuitOperator to do this)


如何获取此python字典...以便气流可以生成
下游kubernetes运算符。.

How do I get this python dictionary ... so that airflow can spawn the downstream kubernetes operators..

没有您不能根据上游任务的输出来动态产生新任务

No. You can't dynamically spawn new tasks based on output of an upstream task.

这样想:对于 scheduler ,必须了解所有任务(它们的 task_id s, trigger_rule s, priority_weight 等),以便能够在适当的时候执行它们。如果任务只是保持动态,那么 Airflow 的调度程序必须类似于 Operating System调度程序(!)。有关更多详细信息,请阅读答案

Think of it this way: for scheduler it is imperative to know all the tasks (their task_ids, trigger_rules, priority_weight etc) ahead of time so as to be able to execute them when the right time comes. If the tasks were to just keep coming up dynamically then Airflow's scheduler would have to become akin to an Operating System scheduler (!). For more details read the EDIT-1 part of this answer

这篇关于气流:如何获取一项任务的返回输出以设置要运行的下游任务的依存关系?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆