如何检查来自 Airflow SimpleHttpOperator 的 HTTP 响应代码? [英] How do I check the HTTP response code from an Airflow SimpleHttpOperator?

查看:42
本文介绍了如何检查来自 Airflow SimpleHttpOperator 的 HTTP 响应代码?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从触发的 Airflow SimpleHttpOperator 接收 HTTP 响应代码.我已经看到使用 'lambda' 类型的示例,并且通过查看响应的正文来这样做,但我希望能够将响应代码传递给一个函数.我当前的代码(其中 90% 来自 example_http_operator):

I'm trying to receive the HTTP response code back from a triggered Airflow SimpleHttpOperator. I've seen examples using 'lambda' type, and am doing so by looking in the body of the response, but I was hoping to be able to pass the response code off to a function. My current code (which is 90% from example_http_operator):

import json
from datetime import timedelta

from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.sensors.http_sensor import HttpSensor
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['me@company.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

dag = DAG(dag_id='kick_off_java_task', default_args=default_args)

kickoff_task = SimpleHttpOperator(
     task_id='kick_off_c2c_java_task',
     http_conn_id='test',
     method='GET',
     endpoint='',
     data={ "command": "run" },
     response_check=lambda response: True if "Ok Message" in response.text else False,
     headers={},
     xcom_push=False,
     dag=dag
)

根据文档和代码,似乎有一种方法可以让 response_check 指向可调用对象,但我不清楚语法,或者我是否需要转向完全不同的方向,例如利用 xcom.

According to the documentation and code, there appears to be a way to have response_check point to a callable, but I am unclear of the syntax, or if I need to head in a completely different direction, such as leveraging xcom.

推荐答案

经过一些尝试和错误,解决方案变得非常简单:

After a little trial and error, the solution turns out to be pretty simple:

dag = DAG(dag_id='kick_off_java_task', default_args=default_args)

def check(response):
    if response == 200:
        print("Returning True")
        return True
    else:
        print("Returning False")
        return False

kickoff_task = SimpleHttpOperator(
     task_id='kick_off_c2c_java_task',
     http_conn_id='c2c_test',
     method='GET',
     endpoint='',
     data={ "command": "run" },
     response_check=lambda response: True if check(response.status_code) is True else False,
     headers={},
     xcom_push=False,
     dag=dag
)

在 lambda 中使用之前定义了 python 函数check",我可以将参数response.status_code"传递给该函数.

having the python function "check" defined before its use in the lambda, I can pass the parameter "response.status_code" to that function.

这篇关于如何检查来自 Airflow SimpleHttpOperator 的 HTTP 响应代码?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆