监视Kafka主题的消费者数量 [英] Monitoring number of consumer for the Kafka topic

查看:770
本文介绍了监视Kafka主题的消费者数量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用Prometheus和Grafana来监视我们的Kafka集群.

We are using Prometheus and Grafana for monitoring our Kafka cluster.

在我们的应用程序中,我们使用Kafka流,并且有可能由于异常而使Kafka流停止.我们正在记录事件setUnCaughtExceptionHandler,但是,当流停止时,我们还需要某种警报.

In our application, we use Kafka streams and there is a chance that Kafka stream getting stopped due to exception. We are logging the event setUnCaughtExceptionHandler but, we also need some kind of alerting when the stream stops.

我们目前拥有的是,jmx_exporter作为代理运行,并通过端点公开Kafka度量,而prometheus从端点获取度量.

What we currently have is, jmx_exporter running as a agent and exposes Kafka metrics through an endpoint and prometheus fetches the metrics from the endpoint.

我们没有看到任何一种指标可以得出每个主题的活跃消费者数.我们错过了什么吗?有关如何获取活动使用者数量并在使用者停止时发送警报的任何建议.

We don't see any kind of metrics which gives the count of active consumers per topic. Are we missing something? Any suggestions on how to get the number of active consumers and send alerts when the consumer stops.

推荐答案

我们有类似的需求,并将每个分区的Kafka消费者延迟添加到了Grafana中,并且还添加了警报,如果延迟超过了指定的阈值(每个主题的阈值应该不同) ,具体取决于负载,例如,对于某些主题,负载可能为10,而负载较高,则可能为100000).因此,如果您有更多的东西,例如1000条未处理的消息,您将收到警报.

we had similar needs and added Kafka Consumer Lag per partition into Grafana, and also added alerts if lag is more than specified threshold (threshold should be different per each topic, depending on load, e.g. for some topics it could be 10, and for highly loaded - 100000). so if you have more that e.g. 1000 unprocessed messages, you will get alert.

您可以为每个kafka流添加状态侦听器,以防流处于错误状态,记录错误或发送电子邮件:

you could add state listener for each kafka stream and in case stream is in error state, log error or send email:

kafkaStream.setStateListener((newState, oldState) -> {
    log.info("Kafka stream state changed [{}] >>>>> [{}]", oldState, newState);
    if (newState == KafkaStreams.State.ERROR || newState == KafkaStreams.State.PENDING_SHUTDOWN) {
        log.error("Kafka Stream is in [{}] state. Application should be restarted", newState);
    }
});

您还可以添加运行状况检查指示器(例如,通过REST端点或通过spring-boot HealthIndicator),该指示器提供有关流是否正在运行的信息:

also you could add health check indicator (e.g. via REST endpoint or via spring-boot HealthIndicator) that provides info whether stream is running or not:

KafkaStreams.State streamState = kafkaStream.state(); state.isRunning();

KafkaStreams.State streamState = kafkaStream.state(); state.isRunning();

我还没有找到任何kafka流指标来提供有关活动使用者或可用连接分区的信息,但是对于我来说,kafka流提供此类数据将是一件很不错的事情(并希望在将来的发行版中可用).

I also haven't found any kafka streams metrics which provide info about active consumers or available connected partitions, but as for me it would be nice if kafka streams provide such data (and hope it will be available in future releases).

这篇关于监视Kafka主题的消费者数量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆