如何在气流 dag 中将参数从 pythonoperator 任务传递到 simplehttpoperator 任务? [英] how to pass parameters from pythonoperator task to simplehttpoperator task in airflow dag?
问题描述
我想触发一个简单的httpoperator,像这样:气流trigger_dag test_trigger --conf '{"name":"something"}'
i want to trigger a simplehttpoperator,like this: airflow trigger_dag test_trigger --conf '{"name":"something"}'
然后我使用 pythonoperator python_callable 通过使用 kwargs['dag_run'].conf 接受参数,并且我想将 ['dag_run'].conf 传递给 simplehttpoperator,我该怎么做?有人可以帮忙吗?
then i use a pythonoperator python_callable to accept parameters by using kwargs['dag_run'].conf , and i want to pass the ['dag_run'].conf to simplehttpoperator, how can i do it? anyone can help?
cc_ = {}
def run_this_func(ds, **kwargs):
cc_ = kwargs['dag_run'].conf
logging.info(cc_)
return cc_
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag)
http_task = SimpleHttpOperator(
task_id='http_task',
http_conn_id='test_http',
method='POST',
endpoint='/api/v1/function',
data=cc_,
headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*"},
response_check=lambda response: True if "10000" in response.content else False,
dag=dag)
http_task.set_upstream(run_this)
推荐答案
感谢 @Chengzhi 和 @Daniel.最后我在 Jinja2/filter.py 中写了一个自定义过滤器 'tojson',因为在气流中默认的 Jinja2 版本是 2.8.1 并且 Jinja2 不包含名为 'tojson' 的内置过滤器,直到版本 2.9 .
Thanks to @Chengzhi and @Daniel. At last I wrote a custom filter 'tojson' in Jinja2/filter.py,because in airflow the default Jinja2 version is 2.8.1 and Jinja2 does not contain the builtin filter named 'tojson' until version 2.9 .
def do_tojson(value):
value = json.JSONEncoder().encode(value)
return value
在dag文件中,代码如下.它有效.
In dag file, the code as follows. It works.
def run_this_func(ds, **kwargs):
cc_ = kwargs['dag_run'].conf
return cc_
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag)
http_task = SimpleHttpOperator(
task_id='http_task',
http_conn_id='test_http',
method='POST',
endpoint='/api/v1/task',
data="{{ task_instance.xcom_pull(task_ids='run_this') |tojson}}",
headers={"Authorization": "Basic YWRtaW46MTIzNDU2", "Accept": "application/json, text/plain, */*",
"Content-Type": "application/json"},
response_check=lambda response: True if "10000" in response.content else False,
dag=dag)
http_task.set_upstream(run_this)
这篇关于如何在气流 dag 中将参数从 pythonoperator 任务传递到 simplehttpoperator 任务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!