Spring-Cloud-Stream-Kafka 自定义健康检查不起作用 [英] Spring-Cloud-Stream-Kafka Custom Health check not working
问题描述
我在 spring-boot(consumer) 应用程序中使用 spring-cloud-stream-kafka.应用程序的运行状况不准确,即使应用程序无法连接到 Kafka(Kafka 代理已关闭)也UP".我已阅读有关 kafka 健康检查的文章.看起来 kafka 健康检查在 spring 执行器健康检查中被禁用.
I am using spring-cloud-stream-kafka in my spring-boot(consumer) application.The health of the app is inaccurate, 'UP' even when the app can't connect to Kafka(Kafka broker is down). I have read articles on kafka health check. It looks like kafka health check is disabled in spring actuator health check.
因此,我设法编写了以下代码来为我的应用启用 kafka 健康检查.我想,我在应用程序配置和我的代码之间缺少一些联系,我没有看到 Kafka 运行状况正常.
So, I managed to write the following code to enable kafka health check for my app. I think, I am missing some connection between the app config and my code and I don't see the Kafka health working.
(1) 我正在创建一个自定义健康指标 bean,如下所示:
(1) I am creating a custom health indicator bean as follows:
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.util.ObjectUtils;
@Configuration
@ConditionalOnClass(name = "org.springframework.boot.actuate.health.HealthIndicator")
public class KafkaBinderHealthIndicatorConfiguration {
@Bean
KafkaBinderHealthIndicator healthIndicator(KafkaMessageChannelBinder kafkaMessageChannelBinder,
KafkaBinderConfigurationProperties configurationProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
Map<String, Object> mergedConfig = configurationProperties.getConsumerConfiguration();
if (!ObjectUtils.isEmpty(mergedConfig)) {
props.putAll(mergedConfig);
}
if (!props.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) {
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, configurationProperties.getKafkaConnectionString());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
KafkaBinderHealthIndicator indicator = new KafkaBinderHealthIndicator(kafkaMessageChannelBinder, consumerFactory);
indicator.setTimeout(configurationProperties.getHealthTimeout());
return indicator;
}
}
(2) 创建活页夹配置:
(2) Created binder config:
import java.io.IOException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.context.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder;
import org.springframework.cloud.stream.binder.kafka.admin.Kafka10AdminUtilsOperation;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
@Configuration
@ConditionalOnMissingBean(Binder.class)
@Import({ KafkaAutoConfiguration.class, PropertyPlaceholderAutoConfiguration.class,
KafkaBinderHealthIndicatorConfiguration.class })
@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class })
public class KafkaBinderConfiguration {
@Autowired
private KafkaExtendedBindingProperties kafkaExtendedBindingProperties;
// @Autowired
// private ProducerListener producerListener;
@Bean
KafkaBinderConfigurationProperties configurationProperties(KafkaProperties kafkaProperties) {
return new KafkaBinderConfigurationProperties();
}
@Bean
KafkaTopicProvisioner provisioningProvider(KafkaBinderConfigurationProperties configurationProperties) {
return new KafkaTopicProvisioner(configurationProperties, new Kafka10AdminUtilsOperation());
}
@Bean
KafkaMessageChannelBinder kafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaMessageChannelBinder kafkaMessageChannelBinder = new KafkaMessageChannelBinder(configurationProperties,
provisioningProvider);
// kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
return kafkaMessageChannelBinder;
}
@Bean
public KafkaJaasLoginModuleInitializer jaasInitializer() throws IOException {
return new KafkaJaasLoginModuleInitializer();
}
}
我添加的应用属性:
App properties I have added:
management.health.binders.enabled = true,management.health.kafka.enabled = true
management.health.binders.enabled = true, management.health.kafka.enabled = true
============输出============当我在本地启动我的应用程序并点击/health 端点时,我看到 kafka 的以下内容:
===========OUTPUT============= When I launch my app locally and hit the /health endpoint, I see the following for kafka:
"binders": {
"status": "UNKNOWN",
"kafka": {
"status": "UNKNOWN"
}
},
推荐答案
问题已通过使用最新版本的 'spring-cloud-stream-binder-kafka' 解决.我最初使用的是旧版本(版本早于 1.3.0.RELEASE),并且 kafka 的运行状况检查不起作用.正如@Sobychacko 所建议的那样,我使用了最新版本 2.0.0 REALEASE,并且 kafka 绑定器的运行状况检查运行良好 :) 没有自定义运行状况指示 bean.
Issue was resolved by using the latest version of 'spring-cloud-stream-binder-kafka'. I was using an older version (version older than 1.3.0.RELEASE) initially and the health check for kafka wasnt working. As @Sobychacko suggested, I used the latest version, 2.0.0 REALEASE and the health check for kafka binders was woking fine :) with no custom health indicator beans.
粘合剂":{"状态": "UP",卡夫卡":{"状态": "UP",健康指标":{状态":向上"}}},
"binders": { "status": "UP", "kafka": { "status": "UP", "healthIndicator": { "status": "UP" } } },
此检查也适用于版本 1.3.0.RELEASE
This check should also work with version 1.3.0.RELEASE
这篇关于Spring-Cloud-Stream-Kafka 自定义健康检查不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!