我可以使用 TriggerDagRunOperator 将参数传递给触发的 dag 吗?空气流动 [英] Can I use a TriggerDagRunOperator to pass a parameter to the triggered dag? Airflow

查看:32
本文介绍了我可以使用 TriggerDagRunOperator 将参数传递给触发的 dag 吗?空气流动的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Airflow 1.10.11

Im using Airflow 1.10.11

我可以读到我可以使用 UI 和 CLI 将参数发送到 dag 执行(https://airflow.apache.org/docs/apache-airflow/1.10.11/dag-run.html ),但我可以使用 TriggerDagRunOperator 吗?( https://airflow.apache.org/docs/apache-airflow/1.10.11/_api/airflow/operators/dagrun_operator/index.html).

I can read that I can send parameters to a dag execution using the UI and CLI ( https://airflow.apache.org/docs/apache-airflow/1.10.11/dag-run.html ), but can I do it using a TriggerDagRunOperator ? ( https://airflow.apache.org/docs/apache-airflow/1.10.11/_api/airflow/operators/dagrun_operator/index.html ).

阅读https://github.com/apache/气流/blob/master/airflow/example_dags/example_trigger_target_dag.py , https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py 似乎可以,但是当我自己尝试这个脚本时,我无法检索conf 消息(可能是因为他们似乎在使用气流 2.0,而我使用的是 1.10).

Reading https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_target_dag.py , https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py it seems like I can, but when I tried this script myself I was not able to retrieve the conf message (probably because it seems they are using airflow 2.0 while I am using 1.10).

所以我的问题是:当 TriggerDagRunOperator 是目标时,有没有办法将参数发送到目标 dag?

So my question is: is there a way to send parameter to the target dag when TriggerDagRunOperator that target?

提前致谢

推荐答案

这里是一个示例,演示如何设置由 TriggerDagRunOperator(在 1.10.11 中).

Here is an example that demonstrates how to set the conf sent with dagruns triggered by TriggerDagRunOperator (in 1.10.11).

触发器.py

from datetime import datetime
from airflow.models import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

dag = DAG(
    dag_id='trigger',
    schedule_interval='@once',
    start_date=datetime(2021, 1, 1)
)


def modify_dro(context, dagrun_order):
    print(context)
    print(dagrun_order)
    dagrun_order.payload = {
        "message": "This is my conf message"
    }
    return dagrun_order


run_this = TriggerDagRunOperator(
    task_id='run_this',
    trigger_dag_id='my_dag',
    python_callable=modify_dro,
    dag=dag
)

dag.py

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(
    dag_id='my_dag',
    schedule_interval='@once',
    start_date=datetime(2021, 1, 1)
)


def run_this_func(**context):
    print(context["dag_run"].conf)


run_this = PythonOperator(
    task_id='run_this',
    provide_context=True,
    python_callable=run_this_func,
    dag=dag,
)

这是 my_dag DAG

[2021-03-05 16:39:15,649] {logging_mixin.py:112} INFO - {'message': 'This is my conf message'}


TriggerDagRunOperator 接受一个 python_callable,它允许您修改 DagRunOrder.DagRunOrder 是一个抽象结构,它包含运行 ID 和有效载荷,它们将是 在触发目标 DAG 时作为 conf 发送.

这篇关于我可以使用 TriggerDagRunOperator 将参数传递给触发的 dag 吗?空气流动的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆