SimpleHttpOperator Airflow,数据模板化 [英] SimpleHttpOperator Airflow, data templated

查看:14
本文介绍了SimpleHttpOperator Airflow,数据模板化的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用我通过 dag_run

I'm trying to rendered correctly data inside a SimpleHttpOperator in Airflow with configuration that I send via dag_run

result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        data=json.dumps({
            'url': '{{ dag_run.conf["url"] }}',
            'fileType': '{{ dag_run.conf["fileType"] }}',
        }),
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

问题是渲染的数据看起来像这样

Issue is that the rendered data appears to be like this

{"url": "{{ dag_run.conf[\"url\"] }}", "fileType": "{{ dag_run.conf[\"fileType\"] }}"}

我不确定我在这里做错了什么.

I'm not sure what I'm doing wrong here.

编辑完整代码如下

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(0),
}


def print_result(**kwargs):
    ti = kwargs['ti']
    pulled_value_1 = ti.xcom_pull(task_ids='schema_detector')
    pprint.pprint(pulled_value_1)


with DAG(
    dag_id='airflow_http_operator',
    default_args=default_args,
    catchup=False,
    schedule_interval="@once",
    tags=['http']
) as dag:

    result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        headers={"Content-Type": "application/json"},
        data=json.dumps({
            'url': '{{ dag_run.conf["url"] }}',
            'fileType': '{{ dag_run.conf["fileType"] }}',
        }),
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

    pull = PythonOperator(
        task_id='print_result',
        python_callable=print_result,
    )
    result >> pull

推荐答案

由于同样的错误,我挣扎了很多.因此,我创建了自己的 Operator(称为 ExtendedHttpOperator),它是 PythonOperator 和 SimpleHttpOperator 的组合.这对我有用:)

I struggled a lot due to the same error. So, I created my own Operator (called as ExtendedHttpOperator) which is a combination of PythonOperator and SimpleHttpOperator. This worked for me :)

此运算符接收一个函数,我们可以在其中收集从 API 传递的数据(使用 dag_run.conf),并在将其传递给 API 之前对其进行解析(如果需要).

This operator receives a function where we can collect data passed from the API (using dag_run.conf), and parse it (if required) before passing it to an API.

from plugins.operators.extended_http_operator import ExtendedHttpOperator

testing_extend = ExtendedHttpOperator(
        task_id="process_user_ids",
        http_conn_id="user_api",
        endpoint="/kafka",
        headers={"Content-Type": "application/json"},
        data_fn=passing_data,
        op_kwargs={"api": "kafka"},
        method="POST",
        log_response=True,
        response_check=lambda response: True
        if validate_response(response) is True
        else False,
    )

def passing_data(**context):
    api = context["api"]
    dag_run_conf = context["dag_run"].conf
    return json.dumps(dag_run_conf[api])

def validate_response(res):
    if res.status_code == 200:
        return True
    else:
        return False

以下是将 ExtendedHttpOperator 添加到气流中的方法:

Here is how you can add ExtendedHttpOperator to your airflow:

将 extended_http_operator.py 文件放在 your_airflow_project/plugins/operators 文件夹中

Put extended_http_operator.py file inside your_airflow_project/plugins/operators folder

# extended_http_operator.py file

from airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from typing import Optional, Dict

"""
Extend Simple Http Operator with a callable function to formulate data. This data function will
be able to access the context to retrieve data such as task instance. This allow us to write cleaner 
code rather than writing one long template line to formulate the json data.
"""


class ExtendedHttpOperator(SimpleHttpOperator):
    @apply_defaults
    def __init__(
        self,
        data_fn,
        log_response: bool = False,
        op_kwargs: Optional[Dict] = None,
        *args,
        **kwargs
    ):
        super(ExtendedHttpOperator, self).__init__(*args, **kwargs)
        if not callable(data_fn):
            raise AirflowException("`data_fn` param must be callable")
        self.data_fn = data_fn
        self.context = None
        self.op_kwargs = op_kwargs or {}
        self.log_response = log_response

    def execute(self, context):
        context.update(self.op_kwargs)
        self.context = context
        http = HttpHook(self.method, http_conn_id=self.http_conn_id)

        data_result = self.execute_callable(context)

        self.log.info("Calling HTTP method")
        self.log.info("Post Data: {}".format(data_result))
        response = http.run(
            self.endpoint, data_result, self.headers, self.extra_options
        )
        if self.log_response:
            self.log.info(response.text)
        if self.response_check:
            if not self.response_check(response):
                raise AirflowException("Invalid parameters")

    def execute_callable(self, context):
        return self.data_fn(**context)

不要忘记在 pluginsplugins/operators 文件夹中创建空的 __init__.py 文件.

Dont forget to create empty __init__.py files inside plugins and plugins/operators folders.

这篇关于SimpleHttpOperator Airflow,数据模板化的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆