2019 年 8 月 - Kafka Consumer Lag 以编程方式 [英] Aug 2019 - Kafka Consumer Lag programmatically

查看:43
本文介绍了2019 年 8 月 - Kafka Consumer Lag 以编程方式的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有什么方法可以通过编程方式找到 Kafka Consumer 中的延迟.我不希望在仪表板上安装和检查外部 Kafka Manager 工具.

Is there any way we can programmatically find lag in the Kafka Consumer. I don't want external Kafka Manager tools to install and check on dashboard.

我们可以列出所有消费者组并检查每个组的延迟.

We can list all the consumer group and check for lag for each group.

目前我们确实有命令来检查延迟,它需要Kafka所在的相对路径.

Currently we do have command to check the lag and it requires the relative path where the Kafka resides.

Spring-Kafka、kafka-python、Kafka Admin 客户端或使用 JMX - 有什么方法可以编码并找出延迟.

Spring-Kafka, kafka-python, Kafka Admin client or using JMX - is there any way we can code and find out the lag.

我们粗心大意,没有监控过程,消费者处于僵尸状态,延迟达到 50,000,导致很多混乱.

We were careless and didn't monitor the process, the consumer was in zombie state and the lag went to 50,000 which resulted in lot of chaos.

只有当问题出现时,我们才会想到这些情况,因为我们正在监视脚本但不知道它会导致僵尸进程.

Only when the issue arises we think of these cases as we were monitoring the script but didn't knew it will be result in zombie process.

非常欢迎任何想法!!

推荐答案

你可以使用 kafka-python 来得到这个,在每个 broker 上运行它或者遍历 broker 列表,它会给所有主题分区消费者滞后.

you can get this using kafka-python, run this on each broker or loop through list of brokers, it will give all topic partitions consumer lag.

BOOTSTRAP_SERVERS = '{}'.format(socket.gethostbyname(socket.gethostname()))
client = BrokerConnection(BOOTSTRAP_SERVERS, 9092, socket.AF_INET)
client.connect_blocking()
list_groups_request = ListGroupsRequest_v1()
future = client.send(list_groups_request)
while not future.is_done:
    for resp, f in client.recv():
      f.success(resp)
for group in future.value.groups:
    if group[1] == 'consumer':
      #print(group[0])
      list_mebers_in_groups = DescribeGroupsRequest_v1(groups=[(group[0])])
      future = client.send(list_mebers_in_groups)
      while not future.is_done:
        for resp, f in client.recv():
          #print resp
          f.success(resp)
          (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
          if len(members) !=0:
            for member in members:
              (member_id, client_id, client_host, member_metadata, member_assignment) = member
              member_topics_assignment = []
              for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
                member_topics_assignment.append(topic)

              for topic in member_topics_assignment:
                consumer = KafkaConsumer(
                          bootstrap_servers=BOOTSTRAP_SERVERS,
                          group_id=group[0],
                          enable_auto_commit=False
                          )
                consumer.topics()

                for p in consumer.partitions_for_topic(topic):
                  tp = TopicPartition(topic, p)
                  consumer.assign([tp])
                  committed = consumer.committed(tp)
                  consumer.seek_to_end(tp)
                  last_offset = consumer.position(tp)
                  if last_offset != None and committed != None:
                    lag = last_offset - committed
                    print "group: {} topic:{} partition: {} lag: {}".format(group[0], topic, p, lag)

                consumer.close(autocommit=False)

这篇关于2019 年 8 月 - Kafka Consumer Lag 以编程方式的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆