如何在Airflow中的调用DAG中捕获传递的--conf参数 [英] How to capture passed --conf parameter in called DAG in Airflow

查看:64
本文介绍了如何在Airflow中的调用DAG中捕获传递的--conf参数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从 REST API 运行 DAG 并将一些参数传递给它.DAG 应该能够捕获参数并使用它.问题是我能够从 REST API 触发 DAG,但 DAG 无法捕获传递的参数.有没有办法做到这一点?

我正在从 REST API 触发 DAG,如下所示.它在 --conf 中传递参数

http://abcairflow.com:8090/admin/rest_api/api?api=trigger_dag\&dag_id=trigger_test_dag\&conf=%7B%22key%22%3A%2

如何在被调用的 DAG 中捕获 conf value 中传递的值.据我所知,conf 应该采用 URL 编码的 JSON 格式数据.

DAG 代码:`

def run_this_func(**kwargs):打印(kwargs)run_this = PythonOperator(task_id='run_this',python_callable=run_this_func,达格=达格)`

解决方案

传递的外部参数是 dag_run 对象的一部分.可以通过以下方式访问它们:

<块引用>

API 请求

导入请求标题 = {'缓存控制':'无缓存','内容类型':'应用程序/json',}数据 = '{"conf":"{\\"Key1\\":\\"Value1\\"}"}'response = requests.post('http://localhost:8080/api/experimental/dags//dag_runs', headers=headers, data=data)

<块引用>

DAG

 def run_this_func(**context):打印(收到{}密钥=消息".格式(上下文[dag_run"].conf))run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, provide_context=True, dag=dag)

I am trying to run a DAG from REST API and pass some parameters to it. The DAG should be able to catch the parameters and use it. The problem is I am able to trigger the DAG from REST API,but the DAG is not able to catch the parameters passed. Is there a way to achieve this?

I am triggering the DAG from REST API as below.It passes the parameters in --conf

http://abcairflow.com:8090/admin/rest_api/api?api=trigger_dag\&dag_id=trigger_test_dag\&conf=%7B%22key%22%3A%2

How to capture the values passed in conf value in the called DAG. As far as I know the conf should take the URL encoded JSON format data.

DAG code:`

def run_this_func(**kwargs):
print(kwargs)

run_this = PythonOperator(
    task_id='run_this',
    python_callable=run_this_func,
    dag=dag
)`

解决方案

The external parameters passed are part of the dag_run object. They can be accessed as follows:

API Request

import requests

headers = {
    'Cache-Control': 'no-cache',
    'Content-Type': 'application/json',
}

data = '{"conf":"{\\"Key1\\":\\"Value1\\"}"}'

response = requests.post('http://localhost:8080/api/experimental/dags/<dag_id>/dag_runs', headers=headers, data=data)

DAG

 def run_this_func(**context):
    print("Received {} for key=message".format(context["dag_run"].conf))


run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, provide_context=True, dag=dag)

这篇关于如何在Airflow中的调用DAG中捕获传递的--conf参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆