如何根据上一个任务的结果在SubDAG中真正创建n个任务 [英] How to really create n tasks in a SubDAG based on the result of a previous task

查看:90
本文介绍了如何根据上一个任务的结果在SubDAG中真正创建n个任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用SubDAG在Airflow中创建动态DAG。我需要做的是,SubDAG中的任务数量由上一个任务的结果( middle_section subtask_ids 变量的结果)确定。 code>函数应该与 initial_task 函数的变量相同)。

I'm creating a dynamic DAG in Airflow using SubDAGs. The thing I need is that the number of tasks inside the SubDAG is determined by the result of a previous task (the subtask_ids variable of the middle_section function should be the same variable of the initial_task function).

问题是我无法访问 SubDagOperator 的subdag函数中的 xcom ,因为我没有任何上下文。另外,由于调度程序的自动发现DAG功能,我无法到达任何数据库以读取某些值: middle_section 每隔几秒钟执行一次。

The thing is that I can't access xcom inside the subdag function of a SubDagOperator because I haven't any context. Also, I can't reach to any DB for reading some value because of the autodiscovery DAG feature of the scheduler: the middle_section is executed every few seconds.

你们如何解决这个问题?根据先前任务的结果在SubDAG中创建动态数量的任务?

How do you guys solve this? Create a dynamic number of tasks inside a SubDAG depending on the result of a previous task?

这是我正在开发的代码:

Here is the code I'm developing:

import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
}


def initial_task(**context):
    subtask_ids = [0, 1, 2]
    task_instance = context['ti']
    task_instance.xcom_push(key='depot_ids', value=subtask_ids)


def middle_section_task(subtask_id):
    print(subtask_id)


def middle_section(parent_dag, arg):
    subdag = DAG(dag_id=f'{dag.dag_id}.middle',
                 default_args=args, schedule_interval='@once')

    subtask_ids = ''  # Read from xcom

    for subtask_id in subtask_ids:
        PythonOperator(task_id=f'{dag.dag_id}.middle_section_task_{subtask_id}',
                       python_callable=middle_section_task,
                       op_kwargs={'subtask_id': subtask_id}, dag=subdag)

    return subdag


def end_task(**context):
    print('Finished')


dag = DAG(dag_id='stackoverflow', default_args=args, schedule_interval=None)

initial = PythonOperator(task_id='start_task', python_callable=initial_task,
                         provide_context=True, dag=dag)

middle = SubDagOperator(task_id='middle', subdag=middle_section(dag, args),
                        default_args=args, dag=dag)

end = PythonOperator(task_id='end_task', python_callable=end_task,
                     provide_context=True, dag=dag)

initial >> middle >> end


推荐答案

我遇到了同样的问题,我无法因为我认为气流任务和子任务的数量是在DAG验证时定义的,因此可以气流方式正确解决100%的问题。而且在验证时没有任何任务在运行,因此气流无法预先知道要计划多少subdag.task。

I had the same issue, I couldn't properly solve 100% the problem in an "Airflow way" since I think that the number of airflow tasks and subtasks is defined in the moment of the DAG validation. And at the validation no task is run, therefore there is no way that airflow knows beforehand how many subdag.tasks will be scheduled.

我规避此问题的方式可能不会是最好的(我愿意接受建议),但是它可以工作:

The way I circumvented this issue might not be the best (I'm open to suggestions) but it works:

main_dag.py

main_dag.py

# imports omitted for brevity
def get_info_from_db():
    # get info from db or somewhere else, this info will define the number of subdag tasks to run
    return urls, names

dag = DAG(...)

urls, names = get_info_from_db()

# You may ignore the dummy operators
start = DummyOperator(task_id='start', default_args=args, dag=dag)
sub_section = SubDagOperator(
    task_id='import-file',
    subdag=imported_subdag(DAG_NAME, 'subdag-name', args, urls=urls, file_names=names),
    default_args=args,
    dag=dag,
)
end = DummyOperator(task_id='end', default_args=args, dag=dag)

start.set_downstream(sub_section)
section_1.set_downstream(end)

然后最后我得到了subdag.py(确保它可以从气流中发现),以防它在单独的文件中

Then finally I have my subdag.py (Make sure it is discoverable from airflow) in case it is in a separate file

# imports omitted for brevity
def fetch_files(file_url, file_name):
    # get file and save it to disk
    return file_location

# this is how I get info returned from the previous task: fetch_files
def validate_file(task_id, **kwargs):
    ti = kwargs['ti']
    task = 'fetch_file-{}'.format(task_id)
    file_location = ti.xcom_pull(task_ids=task)

def imported_subdag(parent_dag_name, child_dag_name, args, urls, file_names):
    dag_subdag = DAG(
        dag_id='%s.%s' % (parent_dag_name, child_dag_name),
        default_args=args,
        schedule_interval="@daily",
    )
    for i in range(len(urls)):
        # the task name should also be dynamic in order not to have duplicates
        validate_file_operator = PythonOperator(task_id='validate_file-{}'.format(i+1),
                                                python_callable=validate_file,
                                                provide_context=True, dag=dag_subdag, op_kwargs={'task_id': i + 1})
        fetch_operator = PythonOperator(task_id='fetch_file-{}'.format(i+1),
                                        python_callable=fetch_zip, dag=dag_subdag,
                                        op_kwargs={'file_url': urls[i], 'file_name': file_names[i]})
        fetch_operator.set_downstream(validate_file_operator)
    return dag_subdag

基本上我的逻辑是,在通过气流验证的时刻, get_info_from_db()被执行,并且所有dag和subdags都已正确安排动态地。如果我从数据库中添加或删除内容,将在下一次dag验证中更新要运行的任务数。

Basically my logic is that in the moment of the validation by Airflow get_info_from_db() gets executed and all dags and subdags are properly scheduled dynamically. If I add or remove content from the db, the number of tasks to be run will be updated in the next dag validation.

这种方法适合我的用例,但是我希望将来Airflow能够在本机上支持此功能(动态任务数/subdag.tasks)。

This approach suited my use case, but I hope in the future Airflow supports this feature (dynamic number of tasks/subdag.tasks) natively.

这篇关于如何根据上一个任务的结果在SubDAG中真正创建n个任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆