气流在单个DAG中生成动态任务,任务N + 1取决于TaskN [英] Airflow Generate Dynamic Tasks in Single DAG , Task N+1 is Dependent on TaskN

查看:143
本文介绍了气流在单个DAG中生成动态任务,任务N + 1取决于TaskN的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

动态生成任务时,我需要让任务2依赖于任务1,任务1 >>任务2或task2.set_upstream(task1).

When generating tasks dynamically, I need to have Task 2 be dependent of Task 1, Task1 >> Task 2 or task2.set_upstream(task1).

由于task_ids是经过评估的,或者似乎是预先确定的,因此我无法提前设置依赖项,因此将不胜感激.

Since the task_ids are evaluated, or seem to be upfront, I cannot set the dependency in advance, any help would be appreciated.

Component(I)任务可以很好地运行,只是它们一次运行即可.

The Component(I) tasks generate fine, except that they all run at once.

for i in range(1,10):
  task_id='Component'+str(i)
  task_id = BashOperator(
  task_id='Component'+str(i),
  bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
  xcom_push=True,
  dag=dag) 
  ?????.set_upstream(??????)

推荐答案

使用以下代码:

a = []
for i in range(0,10):
    a.append(BashOperator(
        task_id='Component'+str(i),
        bash_command="echo  {{ ti.xcom_pull task_ids='SomeOtherTaskXcom', key='return_value') }} -z " + str(i) ,
        xcom_push=True,
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

使用DummyOperator,代码如下:

a = []
for i in range(0,10):
    a.append(DummyOperator(
        task_id='Component'+str(i),
        dag=dag))
    if i not in [0]: 
        a[i-1] >> a[i]

这将生成以下DAG:

这篇关于气流在单个DAG中生成动态任务,任务N + 1取决于TaskN的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆