Airflow 2.0.0+ - 将动态生成的字典传递给由 TriggerDagRunOperator 触发的 DAG [英] Airflow 2.0.0+ - Pass a Dynamically Generated Dictionary to DAG Triggered by TriggerDagRunOperator

查看:31
本文介绍了Airflow 2.0.0+ - 将动态生成的字典传递给由 TriggerDagRunOperator 触发的 DAG的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以前,我使用 TriggerDagRunOperatorpython_callable 参数来动态更改传递给新触发的 DAG 的 dag_run_obj 有效负载.

Previously, I was using the python_callable parameter of the TriggerDagRunOperator to dynamically alter the dag_run_obj payload that is passed to the newly triggered DAG.

自从它在 Airflow 2.0.0 中删除(拉请求:https://github.com/apache/airflow/pull/6317),有没有办法做到这一点,而无需创建自定义的 TriggerDagRunOperator?

Since its removal in Airflow 2.0.0 (Pull Req: https://github.com/apache/airflow/pull/6317), is there a way to do this, without creating a custom TriggerDagRunOperator?

对于上下文,这是我的代码流程:

For context, here is the flow of my code:

#Poll Amazon S3 bucket for new received files
fileSensor_tsk = S3KeySensor()

#Use chooseDAGBasedOnInput function to create dag_run object (previously python_callable was used directly in TriggerDagRunOperator to create the dag_run object for the new triggered DAG)
#dag_run object will pass received file name details to new DAG for reference in order to complete its own work
chooseDAGTrigger_tsk = BranchPythonOperator(
    task_id='chooseDAGTrigger_tsk',
    python_callable=chooseDAGBasedOnInput,
    provide_context=True
)

triggerNewDAG_tsk = TriggerDagRunOperator(
    task_id='triggerNewDAG_tsk',
    trigger_dag_id='1000_NEW_LOAD'
)

triggerNewDAG2_tsk = TriggerDagRunOperator(
    task_id='triggerNew2DAG_tsk',
    trigger_dag_id='1000_NEW2_LOAD'
) ...

任何帮助或评论将不胜感激!

Any help or commentary would be greatly appreciated!

EDIT - 添加以前在 TriggerDagRunOperator 中使用的 python_callable 函数:

EDIT - adding previously used python_callable function used in TriggerDagRunOperator:

def intakeFile(context, dag_run_obj):

    #read from S3, get filename and pass to triggered DAG
    bucket_name = os.environ.get('bucket_name')
    s3_hook = S3Hook(aws_conn_id='aws_default')
    s3_hook.copy_object()
    s3_hook.delete_objects()
    ...

    dag_run_obj.payload = {
        'filePath': workingPath,
        'source': source,
        'fileName': fileName
    }

    return dag_run_obj

推荐答案

TriggerDagRunOperator 现在采用 conf 参数,可以向该参数提供字典作为 DagRun 的 conf 对象.这里 是有关触发 DAG 的更多信息,您可能会发现这些信息也很有帮助.

The TriggerDagRunOperator now takes a conf parameter to which a dictinoary can be provided as the conf object for the DagRun. Here is more information on triggering DAGs which you may find helpful as well.

编辑

由于您需要执行一个函数来确定要触发哪个 DAG 并且不想创建自定义的 TriggerDagRunOperator,您可以执行 intakeFile()PythonOperator(或使用 @task 装饰器和 Task Flow API) 并使用返回值作为 TriggerDagRunOperator 中的 conf 参数.作为 Airflow 2.0 的一部分,返回值会自动推送到许多运算符中的 XCom;PythonOperator 包括在内.

Since you need to execute a function to determine which DAG to trigger and do not want to create a custom TriggerDagRunOperator, you could execute intakeFile() in a PythonOperator (or use the @task decorator with the Task Flow API) and use the return value as the conf argument in the TriggerDagRunOperator. As part of Airflow 2.0, return values are automatically pushed to XCom within many operators; the PythonOperator included.

这里是总体思路:

def intakeFile(*args, **kwargs):

    # read from S3, get filename and pass to triggered DAG
    bucket_name = os.environ.get("bucket_name")
    s3_hook = S3Hook(aws_conn_id="aws_default")
    s3_hook.copy_object()
    s3_hook.delete_objects()
    ...

    dag_run_obj.payload = {
        "filePath": workingPath,
        "source": source,
        "fileName": fileName,
    }

    return dag_run_obj


get_dag_to_trigger = PythonOperator(
    task_id="get_dag_to_trigger",
    python_callable=intakeFile
)

triggerNewDAG_tsk = TriggerDagRunOperator(
    task_id="triggerNewDAG_tsk",
    trigger_dag_id="{{ ti.xcom_pull(task_ids='get_dag_to_trigger', key='return_value') }}",
)

get_dag_to_trigger >> triggerNewDAG_tsk

这篇关于Airflow 2.0.0+ - 将动态生成的字典传递给由 TriggerDagRunOperator 触发的 DAG的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆