想要创建当前任务下游的气流任务 [英] Want to create airflow tasks that are downstream of the current task

查看:72
本文介绍了想要创建当前任务下游的气流任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我对气流几乎是全新的。

I'm mostly brand new to airflow.

我有两个步骤:


  1. 获取所有符合条件的文件

  2. 解压缩文件

文件压缩后为半个演出,未压缩时为2-3个演出。我可以轻松地一次处理20多个文件,这意味着解压缩所有文件的运行时间可能比任何合理的超时时间都要长

The files are half a gig compressed, and 2 - 3 gig when uncompressed. I can easily have 20+ files to process at a time, which means uncompressing all of them can run longer than just about any reasonable timeout

我可以使用XCom来获取步骤1的结果,但是我想做的是这样的:

I could use XCom to get the results of step 1, but what I'd like to do is something like this:

def processFiles (reqDir, gvcfDir, matchSuffix):
    theFiles = getFiles (reqDir, gvcfDir, matchSuffix)

    for filePair in theFiles:
        task = PythonOperator (task_id = "Uncompress_" + os.path.basename (theFile), 
                                python_callable = expandFile, 
                                op_kwargs = {'theFile': theFile}, 
                                dag = dag)
task.set_upstream (runThis)

问题是 runThis是调用processFiles的PythonOperator,因此必须在processFiles之后进行声明。

The problem is that "runThis" is the PythonOperator that called processFiles, so it has to be declared after processFiles.

有什么方法可以使这项工作?

Is there any way to make this work?

这是XCo的原因吗? m存在,我应该放弃这种方法并使用XCom吗?

Is this the reason that XCom exists, and I should dump this approach and go with XCom?

推荐答案

关于您提出的解决方案,我不认为您可以使用XComs实现此目的,因为它们仅对实例可用,而在您定义DAG时则不可用(据我所知)。

Regarding your proposed solution, I don't think you can use XComs to achieve this, as they are only available to instances and not when you define the DAG (to the best of my knowledge).

但是,您可以使用 SubDAG 以实现您的目标。 SubDagOperator 获取一个将在执行操作符时调用的函数,该函数将生成DAG,从而为您提供了动态创建以下子段的机会:您的工作流程。

You can however use a SubDAG to achieve your objective. The SubDagOperator gets a function which is going to be invoked when the operator is going to be executed and that generates a DAG, giving you a chance to dynamically create a sub-section of your workflow.

您可以使用以下简单示例测试该想法,该示例在每次调用时都会随机生成任务:

You can test the idea using this simple example, which generates a random of tasks every time it's invoked:

import airflow
from builtins import range
from random import randint
from airflow.operators.bash_operator import BashOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DAG

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2)
}

dag = DAG(dag_id='dynamic_dag', default_args=args)

def generate_subdag(parent_dag, dag_id, default_args):
    # pseudo-randomly determine a number of tasks to be created
    n_tasks = randint(1, 10)

    subdag = DAG(
        '%s.%s' % (parent_dag.dag_id, dag_id),
        schedule_interval=parent_dag.schedule_interval,
        start_date=parent_dag.start_date,
        default_args=default_args
    )
    for i in range(n_tasks):
        i = str(i)
        task = BashOperator(task_id='echo_%s' % i, bash_command='echo %s' % i, dag=subdag)

    return subdag

subdag_dag_id = 'dynamic_subdag'

SubDagOperator(
    subdag=generate_subdag(dag, subdag_dag_id, args),
    task_id=subdag_dag_id,
    dag=dag
)

如果执行您会注意到,在不同的运行中,SubDAG可能包含不同数量的任务(我在1.8.0版中对此进行了测试)。通过访问图形视图,单击灰色的SubDAG节点,然后单击放大到SubDAG,可以访问WebUI上的SubDAG视图。

If you execute this you'll notice that in different runs SubDAGs are likely to contain a different number of tasks (I tested this with version 1.8.0). You can access the SubDAG view on the WebUI by accessing the graph view, clicking on the grey SubDAG node and then on "Zoom into SubDAG".

您可以使用此概念通过列出文件并为每个文件创建一个任务,而不是像示例中那样仅以随机数生成文件。任务本身可以并行排列(如我所做的那样),按顺序排列或按任何有效的有向无环布局排列。

You can use this concept by listing files and creating one task for each of those instead of just generating them in a random number like in the example. The tasks themselves can be arranged in parallel (as I did), sequentially or in any valid directed acyclic layout.

这篇关于想要创建当前任务下游的气流任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆