Spring Actuator + Kafka Streams - 将 kafka 流状态添加到健康检查端点 [英] Spring Actuator + Kafka Streams - Add kafka stream status to health check endpoint

查看:37
本文介绍了Spring Actuator + Kafka Streams - 将 kafka 流状态添加到健康检查端点的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用 apache kafka-streams 的 Spring Boot 应用程序.我不使用 spring 云流.我添加了执行器健康检查端点.我在 application.yml 中这样配置:

I have a spring boot app where I use apache kafka-streams. I don't use spring cloud streams. I added actuator health check endpoint. I configured it in the application.yml like that:

management:
  health.db.enabled: false
  endpoints.web:
    base-path:
    path-mapping.health: /

当抛出运行时异常并且我的流被停止(如日志所示)但运行状况检查状态为 UP 时.

When a runtime exception was thrown and my stream was stopped as logs show but the health check status is UP.

2019-09-17 13:16:31.522 INFO 1 --- [Thread-5] org.apache.kafka.streams.KafkaStreams : 流客户端 [lpp-model-stream-7e6e8fea-fcad-4033-92a4-5ede50de6e17] Streams 客户端完全停止ely

如何将 kafka 流状态绑定到健康检查端点?

How to bind kafka stream status to health check endpoint ?

我的 pom.xml:

My pom.xml:

  <dependencies>
            <dependency>
                <groupId>org.springframework.kafka</groupId>
                <artifactId>spring-kafka</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
            </dependency>
            <dependency>
                <groupId>data-wizards</groupId>
                <artifactId>lpp-common-avro</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <dependency>
                <groupId>io.confluent</groupId>
                <artifactId>kafka-streams-avro-serde</artifactId>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
            </dependency>
            <dependency>
                <groupId>io.vavr</groupId>
                <artifactId>vavr</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>

链接到我创建流的代码:https://gist.github.com/solveretur/fc4fdd6c7663dc4d58fe72d48029f9c3

Link to the code where I create the stream: https://gist.github.com/solveretur/fc4fdd6c7663dc4d58fe72d48029f9c3

推荐答案

KafkaStreams 维护一个内存中 State,它可以映射到 Actuator 的健康状态.状态可以是以下之一:CREATEDERRORNOT_RUNNINGPENDING_SHUTDOWNREBALANCING, RUNNING - 它们是不言自明的.请参阅状态转换文档 https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/KafkaStreams.State.html

KafkaStreams maintain an in-memory State which could be mapped to Actuator's health statuses. State could be one of following:CREATED, ERROR, NOT_RUNNING, PENDING_SHUTDOWN, REBALANCING, RUNNING - they are self-explanatory. See docs for state transitions https://kafka.apache.org/11/javadoc/org/apache/kafka/streams/KafkaStreams.State.html

如果您正在寻找一个完整的示例,您可以使用以下示例并根据您的需要对其进行更新(例如,您可能不会将 CREATED 视为状态 UP).确保在应用程序上下文中有一个 KafkaStreams 类型的 bean.

If you're looking for a complete example, you could take following one and update it according to your needs (e.g. you may not count CREATED as status UP). Make sure you have a bean of type KafkaStreams in application context.

//Note that class name prefix before `HealthIndicator` will be camel-cased
//and used as a health component name, `kafkaStreams` here
@Component
public class KafkaStreamsHealthIndicator implements HealthIndicator {

    //if you have multiple instances, inject as Map<String, KafkaStreams>
    //Spring will map KafkaStreams instances by bean names present in context
    //so you can provide status details for each stream by name
    @Autowired
    private KafkaStreams kafkaStreams; 

    @Override
    public Health health() {
        State kafkaStreamsState = kafkaStreams.state();

        // CREATED, RUNNING or REBALANCING
        if (kafkaStreamsState == State.CREATED || kafkaStreamsState.isRunning()) {
            //set details if you need one
            return Health.up().build();
        }

        // ERROR, NOT_RUNNING, PENDING_SHUTDOWN, 
        return Health.down().withDetail("state", kafkaStreamsState.name()).build();
    }
}

然后健康端点将显示如下:

Then the health endpoint will display it like:

{
    "status": "UP",
    "kafkaStreams": {
        "status": "DOWN",
        "details": {  //not included if "UP"
            "state": "NOT_RUNNING"
        }
    }
}

这篇关于Spring Actuator + Kafka Streams - 将 kafka 流状态添加到健康检查端点的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆