如何解析气流模板中的json字符串 [英] How to parse json string in airflow template

查看:71
本文介绍了如何解析气流模板中的json字符串的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

是否可以解析气流模板中的JSON字符串?

Is it possible to parse JSON string inside an airflow template?

我有一个HttpSensor,它通过REST API监视作业,但是作业ID在带有 xcom_push 标记为 True 的上游任务的响应。

I have a HttpSensor which monitors a job via a REST API, but the job id is in the response of the upstream task which has xcom_push marked True.

我想做类似下面的事情,但是,这段代码给出了错误 jinja2.exceptions.UndefinedError:'json'is undefined

I would like to do something like the following, however, this code gives the error jinja2.exceptions.UndefinedError: 'json' is undefined

t1 = SimpleHttpOperator(
    http_conn_id="s1",
    task_id="job",
    endpoint="some_url",
    method='POST',
    data=json.dumps({ "foo": "bar" }),
    xcom_push=True,
    dag=dag,
)

t2 = HttpSensor(
    http_conn_id="s1",
    task_id="finish_job",
    endpoint="job/{{ json.loads(ti.xcom_pull(\"job\")).jobId }}",
    response_check=lambda response: True if response.json().state == "complete" else False,
    poke_interval=5,
    dag=dag
)

t2.set_upstream(t1)


推荐答案

您可以添加自定义的Jinja过滤器使用 user_defined_filters

You can add a custom Jinja filter to your DAG with the parameter user_defined_filters to parse the json.


一个过滤器字典,该过滤器将在jinja模板中显示
。例如,将
dict(hello = lambda name:'Hello%s'%name)传递给此参数将允许
{{'world'|您好}} 在与该DAG的
相关的所有Jinja模板中。

a dictionary of filters that will be exposed in your jinja templates. For example, passing dict(hello=lambda name: 'Hello %s' % name) to this argument allows you to {{ 'world' | hello }} in all jinja templates related to this DAG.



dag = DAG(
    ...
    user_defined_filters={'fromjson': lambda s: json.loads(s)},
)

t1 = SimpleHttpOperator(
    task_id='job',
    xcom_push=True,
    ...
)

t2 = HttpSensor(
    endpoint='job/{{ (ti.xcom_pull("job") | fromjson)["jobId"] }}',
    ...
)

但是,只编写自己的自定义 JsonHttpOperator 插件(或在 SimpleHttpOperator 中添加标志)在返回之前解析JSON这样您就可以直接在模板中引用 {{ti.xcom_pull( job)[ jobId]

However, it may be cleaner to just write your own custom JsonHttpOperator plugin (or add a flag to SimpleHttpOperator) that parses the JSON before returning so that you can just directly reference {{ti.xcom_pull("job")["jobId"] in the template.

class JsonHttpOperator(SimpleHttpOperator):

    def execute(self, context):
        text = super(JsonHttpOperator, self).execute(context)
        return json.loads(text)

这篇关于如何解析气流模板中的json字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆