如何解析气流模板中的json字符串 [英] How to parse json string in airflow template
问题描述
是否可以解析气流模板中的JSON字符串?
Is it possible to parse JSON string inside an airflow template?
我有一个HttpSensor,它通过REST API监视作业,但是作业ID在带有 xcom_push
标记为 True
的上游任务的响应。
I have a HttpSensor which monitors a job via a REST API, but the job id is in the response of the upstream task which has xcom_push
marked True
.
我想做类似下面的事情,但是,这段代码给出了错误 jinja2.exceptions.UndefinedError:'json'is undefined
I would like to do something like the following, however, this code gives the error jinja2.exceptions.UndefinedError: 'json' is undefined
t1 = SimpleHttpOperator(
http_conn_id="s1",
task_id="job",
endpoint="some_url",
method='POST',
data=json.dumps({ "foo": "bar" }),
xcom_push=True,
dag=dag,
)
t2 = HttpSensor(
http_conn_id="s1",
task_id="finish_job",
endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}",
response_check=lambda response: True if response.json().state == "complete" else False,
poke_interval=5,
dag=dag
)
t2.set_upstream(t1)
推荐答案
您可以添加自定义的Jinja过滤器使用 user_defined_filters
You can add a custom Jinja filter to your DAG with the parameter user_defined_filters to parse the json.
一个过滤器字典,该过滤器将在jinja模板中显示
。例如,将
dict(hello = lambda name:'Hello%s'%name)
传递给此参数将允许
您{{'world'|您好}}
在与该DAG的
相关的所有Jinja模板中。
a dictionary of filters that will be exposed in your jinja templates. For example, passing
dict(hello=lambda name: 'Hello %s' % name)
to this argument allows you to{{ 'world' | hello }}
in all jinja templates related to this DAG.
dag = DAG(
...
user_defined_filters={'fromjson': lambda s: json.loads(s)},
)
t1 = SimpleHttpOperator(
task_id='job',
xcom_push=True,
...
)
t2 = HttpSensor(
endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}',
...
)
但是,只编写自己的自定义 JsonHttpOperator
插件(或在 SimpleHttpOperator
中添加标志)在返回之前解析JSON这样您就可以直接在模板中引用 {{ti.xcom_pull( job)[ jobId]
。
However, it may be cleaner to just write your own custom JsonHttpOperator
plugin (or add a flag to SimpleHttpOperator
) that parses the JSON before returning so that you can just directly reference {{ti.xcom_pull("job")["jobId"]
in the template.
class JsonHttpOperator(SimpleHttpOperator):
def execute(self, context):
text = super(JsonHttpOperator, self).execute(context)
return json.loads(text)
这篇关于如何解析气流模板中的json字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!