如何检查来自 Airflow SimpleHttpOperator 的 HTTP 响应代码? [英] How do I check the HTTP response code from an Airflow SimpleHttpOperator?
问题描述
我正在尝试从触发的 Airflow SimpleHttpOperator 接收 HTTP 响应代码.我已经看到使用 'lambda' 类型的示例,并且通过查看响应的正文来这样做,但我希望能够将响应代码传递给一个函数.我当前的代码(其中 90% 来自 example_http_operator):
I'm trying to receive the HTTP response code back from a triggered Airflow SimpleHttpOperator. I've seen examples using 'lambda' type, and am doing so by looking in the body of the response, but I was hoping to be able to pass the response code off to a function. My current code (which is 90% from example_http_operator):
import json
from datetime import timedelta
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.sensors.http_sensor import HttpSensor
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['me@company.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
dag = DAG(dag_id='kick_off_java_task', default_args=default_args)
kickoff_task = SimpleHttpOperator(
task_id='kick_off_c2c_java_task',
http_conn_id='test',
method='GET',
endpoint='',
data={ "command": "run" },
response_check=lambda response: True if "Ok Message" in response.text else False,
headers={},
xcom_push=False,
dag=dag
)
根据文档和代码,似乎有一种方法可以让 response_check 指向可调用对象,但我不清楚语法,或者我是否需要转向完全不同的方向,例如利用 xcom.
According to the documentation and code, there appears to be a way to have response_check point to a callable, but I am unclear of the syntax, or if I need to head in a completely different direction, such as leveraging xcom.
推荐答案
经过一些尝试和错误,解决方案变得非常简单:
After a little trial and error, the solution turns out to be pretty simple:
dag = DAG(dag_id='kick_off_java_task', default_args=default_args)
def check(response):
if response == 200:
print("Returning True")
return True
else:
print("Returning False")
return False
kickoff_task = SimpleHttpOperator(
task_id='kick_off_c2c_java_task',
http_conn_id='c2c_test',
method='GET',
endpoint='',
data={ "command": "run" },
response_check=lambda response: True if check(response.status_code) is True else False,
headers={},
xcom_push=False,
dag=dag
)
在 lambda 中使用之前定义了 python 函数check",我可以将参数response.status_code"传递给该函数.
having the python function "check" defined before its use in the lambda, I can pass the parameter "response.status_code" to that function.
这篇关于如何检查来自 Airflow SimpleHttpOperator 的 HTTP 响应代码?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!