如何监控Kafka消息的应用程序处理进行负载测试 [英] How to monitor application processing of Kafka messages for load testing

查看:23
本文介绍了如何监控Kafka消息的应用程序处理进行负载测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个应用程序(不是我的)从 Kafka 读取消息,对它们进行一些处理,并将记录存储在数据库中.我已经用 Java 编写了一个程序,该程序以给定的速率将消息写入队列.现在,它通过在测试运行结束时查询数据库来执行简单的性能测量,以确保记录输入 = 记录输出.但是,我想扩展它以定期检查队列以查看应用程序尚未处理的待处理消息数量,以查看它是否正在备份.

There is an application (not mine) that reads messages from Kafka, does some processing on them, and stores records in a database. I've put together a program in Java that writes messages into the queue at a given rate. Right now, it does a simple measure of performance by querying the database at the end of the test run to ensure that records in = records out. However, I'd like to expand it to periodically check the queue to see how many messages are pending that the application hasn't yet processed to see if it's getting backed up.

我认为我可以在 Zookeeper 中检查应用程序组 ID 的偏移量.我查看了 Kafka 文档,但它只提供了基本的消费者示例,并且 API 文档很少最好,所以我不知道如何找到这些信息.

I figure that I can check offset of the application's group ID in Zookeeper. I looked at the Kafka documentation, but it only gives basic consumer examples and the API documentation is sparse at best, so I'm not sure how to go about finding this information.

我需要调用哪些 API 才能找出应用程序当前在队列中的位置,以及该位置后面的队列中有多少消息?

What APIs to I need to call in order to find out where in the queue the application is currently at, and how many messages are in the queue behind that position?

我将 Kafka 2.10-0.8.2.1 与一个 Zookeeper 实例和三个 Kafka 实例一起使用,负载测试器使用的是 0.8.2.1 Java API.所讨论的主题有三个分区(每个 Kafka 服务器上有一个分区),但是为了测试目的,只有一个消费者.

I'm using Kafka 2.10-0.8.2.1 with a single Zookeeper instance and three Kafka instances, and the load tester is using the 0.8.2.1 Java API. The topic in question has three partitions (one on each Kafka server), however for the purpose of the test there is only a single consumer.

推荐答案

我建议查看 Kafka 中已经提供的工具(如果您需要直接调用 API,代码在 src 中可用).特别是

I would suggest looking at the already provided tools in Kafka (code is available in src if you need to call the API directly). In particular,

$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1

将显示偏移和滞后:

consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = consumer-group1-consumer1
Consumer offset = 70121994703
= 70,121,994,703 (65.31G)
Log size = 70122018287
= 70,122,018,287 (65.31G)
Consumer lag = 23584
= 23,584 (0.00G)

参考文献:

这篇关于如何监控Kafka消息的应用程序处理进行负载测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆