如何从气流中的 Python Operator 返回列表并将其用作 dags 中后续任务的参数 [英] How can I return lists from Python Operator in airflow and use it as argument for subsequent task in dags

查看:24
本文介绍了如何从气流中的 Python Operator 返回列表并将其用作 dags 中后续任务的参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有 3 个任务要在相同的 dag 中运行.而 Task1 返回字典 task2 和 task3 的列表尝试使用结果返回的一个字典元素任务1.

I have 3 tasks to run in same dags. While Task1 return list of dictionary task2 and task3 try to use one dictionary element from result return by task1.

def get_list():
    ....
    return listOfDict

def parse_1(example_dict):
    ...

def parse_2(example_dict):
    ...

dag = DAG('dagexample', default_args=default_args)
data_list = PythonOperator(
task_id='get_lists',
python_callable=get_list,
dag=dag)
for data in data_list:
    sub_task1 = PythonOperator(
        task_id='data_parse1' + data['id'],
        python_callable=parse_1,
        op_kwargs={'dataObject': data},
        dag=dag,
     )
    sub_task2 = PythonOperator(
        task_id='data_parse2' + data['id'],
        python_callable=parse_2,
        op_kwargs={'dataObject': data},
        dag=dag,
     )

推荐答案

您应该使用 XCom 在不同任务之间传递变量/消息.看看这个示例:https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

You should use XCom for passing variables/messages between different task. Take a look at this example: https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

对于您的情况,它应该类似于以下内容:

For your case, it should be something similar as below:

default_args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True, # This is needed
}


def get_list():
    ....
    return listOfDict

def parse_1(**kwargs):
    ti = kwargs['ti']

    # get listOfDict
    v1 = ti.xcom_pull(key=None, task_ids='get_lists')

    # You can now use this v1 dictionary as a normal python dict
    ...

def parse_2(**kwargs):
    ti = kwargs['ti']

    # get listOfDict
    v1 = ti.xcom_pull(key=None, task_ids='get_lists')
    ...

dag = DAG('dagexample', default_args=default_args)
data_list = PythonOperator(
    task_id='get_lists',
    python_callable=get_list,
    dag=dag)

for data in get_list():
    sub_task1 = PythonOperator(
        task_id='data_parse1' + data['id'],
        python_callable=parse_1,
        op_kwargs={'dataObject': data},
        dag=dag,
     )

    sub_task2 = PythonOperator(
        task_id='data_parse2' + data['id'],
        python_callable=parse_2,
        op_kwargs={'dataObject': data},
        dag=dag,
     )

这篇关于如何从气流中的 Python Operator 返回列表并将其用作 dags 中后续任务的参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆