如何在Airflow中的调用DAG中捕获传递的--conf参数 [英] How to capture passed --conf parameter in called DAG in Airflow
问题描述
我正在尝试从 REST API 运行 DAG 并将一些参数传递给它.DAG 应该能够捕获参数并使用它.问题是我能够从 REST API 触发 DAG,但 DAG 无法捕获传递的参数.有没有办法做到这一点?
我正在从 REST API 触发 DAG,如下所示.它在 --conf 中传递参数
http://abcairflow.com:8090/admin/rest_api/api?api=trigger_dag\&dag_id=trigger_test_dag\&conf=%7B%22key%22%3A%2
如何在被调用的 DAG 中捕获 conf value 中传递的值.据我所知,conf 应该采用 URL 编码的 JSON 格式数据.
DAG 代码:`
def run_this_func(**kwargs):打印(kwargs)run_this = PythonOperator(task_id='run_this',python_callable=run_this_func,达格=达格)`
传递的外部参数是 dag_run
对象的一部分.可以通过以下方式访问它们:
API 请求
导入请求标题 = {'缓存控制':'无缓存','内容类型':'应用程序/json',}数据 = '{"conf":"{\\"Key1\\":\\"Value1\\"}"}'response = requests.post('http://localhost:8080/api/experimental/dags//dag_runs', headers=headers, data=data)
<块引用>
DAG
def run_this_func(**context):打印(收到{}密钥=消息".格式(上下文[dag_run"].conf))run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, provide_context=True, dag=dag)
I am trying to run a DAG from REST API and pass some parameters to it. The DAG should be able to catch the parameters and use it. The problem is I am able to trigger the DAG from REST API,but the DAG is not able to catch the parameters passed. Is there a way to achieve this?
I am triggering the DAG from REST API as below.It passes the parameters in --conf
http://abcairflow.com:8090/admin/rest_api/api?api=trigger_dag\&dag_id=trigger_test_dag\&conf=%7B%22key%22%3A%2
How to capture the values passed in conf value in the called DAG. As far as I know the conf should take the URL encoded JSON format data.
DAG code:`
def run_this_func(**kwargs):
print(kwargs)
run_this = PythonOperator(
task_id='run_this',
python_callable=run_this_func,
dag=dag
)`
The external parameters passed are part of the dag_run
object. They can be accessed as follows:
API Request
import requests
headers = {
'Cache-Control': 'no-cache',
'Content-Type': 'application/json',
}
data = '{"conf":"{\\"Key1\\":\\"Value1\\"}"}'
response = requests.post('http://localhost:8080/api/experimental/dags/<dag_id>/dag_runs', headers=headers, data=data)
DAG
def run_this_func(**context):
print("Received {} for key=message".format(context["dag_run"].conf))
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, provide_context=True, dag=dag)
这篇关于如何在Airflow中的调用DAG中捕获传递的--conf参数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!