在Airflow中创建动态工作流程的正确方法 [英] Proper way to create dynamic workflows in Airflow

查看:590
本文介绍了在Airflow中创建动态工作流程的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Airflow中是否有任何方法可以创建工作流,使得任务B. *的数量在任务A完成之前是未知的?我看过subdags,但看起来它只能与必须由Dag创建确定的一组静态任务一起使用。

Is there any way in Airflow to create a workflow such that the number of tasks B.* is unknown until completion of Task A? I have looked at subdags but it looks like it can only work with a static set of tasks that have to be determined at Dag creation.

dag会触发工作吗?

Would dag triggers work? And if so could you please provide an example.

我遇到了一个问题,即在任务执行之前,不可能知道计算任务C所需的任务B的数量。 A已经完成。每个任务B. *将花费几个小时才能计算,并且无法合并。

I have an issue where it is impossible to know the number of task B's that will be needed to calculate Task C until Task A has been completed. Each Task B.* will take several hours to compute and cannot be combined.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|



想法#1



我不之所以喜欢这种解决方案,是因为我必须创建一个阻塞的ExternalTask​​Sensor,并且所有任务B. *将需要2-24小时才能完成。因此,我认为这不是可行的解决方案。当然有更简单的方法吗?还是不是为此设计了气流?

Idea #1

I don't like this solution because I have to create a blocking ExternalTaskSensor and all the Task B.* will take between 2-24 hours to complete. So I do not consider this a viable solution. Surely there is an easier way? Or was Airflow not designed for this?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|



编辑1:



到目前为止,这个问题仍然没有很好的答案。我已经与寻求解决方案的几个人联系。

Edit 1:

As of now this question still does not have a great answer. I have been contacted by several people looking for a solution.

推荐答案

这是我在没有任何子标记的情况下对类似请求的处理方式:

Here is how I did it with a similar request without any subdags:

首先创建一个返回所需值的方法

First create a method that returns whatever values you want

def values_function():
     return values

下一个将动态生成作业的创建方法:

Next create method that will generate the jobs dynamically:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

然后将它们组合:

push_func = PythonOperator(
        task_id='push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        push_func >> group(i) >> complete

这篇关于在Airflow中创建动态工作流程的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆