2019年8月-Kafka消费者滞后编程 [英] Aug 2019 - Kafka Consumer Lag programmatically

查看:151
本文介绍了2019年8月-Kafka消费者滞后编程的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有什么方法可以通过编程方式找到卡夫卡消费者中的滞后.我不希望外部Kafka Manager工具安装并在仪表板上检查.

Is there any way we can programmatically find lag in the Kafka Consumer. I don't want external Kafka Manager tools to install and check on dashboard.

我们可以列出所有消费者组并检查每个组的滞后时间.

We can list all the consumer group and check for lag for each group.

当前,我们确实有检查滞后的命令,它需要Kafka所在的相对路径.

Currently we do have command to check the lag and it requires the relative path where the Kafka resides.

Spring-Kafka,kafka-python,Kafka Admin客户端或使用JMX-有什么方法可以编码并找出滞后时间.

Spring-Kafka, kafka-python, Kafka Admin client or using JMX - is there any way we can code and find out the lag.

我们很粗心,没有监控流程,消费者处于僵尸状态,延迟达到50,000,造成了很多混乱.

We were careless and didn't monitor the process, the consumer was in zombie state and the lag went to 50,000 which resulted in lot of chaos.

仅当问题出现时,我们在监视脚本时会想到这些情况,但不知道会导致僵尸进程.

Only when the issue arises we think of these cases as we were monitoring the script but didn't knew it will be result in zombie process.

任何想法都受到欢迎!

推荐答案

您可以使用kafka-python获取此代码,在每个代理上运行此代码,或循环遍历代理列表,这将给所有主题分区带来消费者滞后.

you can get this using kafka-python, run this on each broker or loop through list of brokers, it will give all topic partitions consumer lag.

BOOTSTRAP_SERVERS = '{}'.format(socket.gethostbyname(socket.gethostname()))
client = BrokerConnection(BOOTSTRAP_SERVERS, 9092, socket.AF_INET)
client.connect_blocking()
list_groups_request = ListGroupsRequest_v1()
future = client.send(list_groups_request)
while not future.is_done:
    for resp, f in client.recv():
      f.success(resp)
for group in future.value.groups:
    if group[1] == 'consumer':
      #print(group[0])
      list_mebers_in_groups = DescribeGroupsRequest_v1(groups=[(group[0])])
      future = client.send(list_mebers_in_groups)
      while not future.is_done:
        for resp, f in client.recv():
          #print resp
          f.success(resp)
          (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
          if len(members) !=0:
            for member in members:
              (member_id, client_id, client_host, member_metadata, member_assignment) = member
              member_topics_assignment = []
              for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
                member_topics_assignment.append(topic)

              for topic in member_topics_assignment:
                consumer = KafkaConsumer(
                          bootstrap_servers=BOOTSTRAP_SERVERS,
                          group_id=group[0],
                          enable_auto_commit=False
                          )
                consumer.topics()

                for p in consumer.partitions_for_topic(topic):
                  tp = TopicPartition(topic, p)
                  consumer.assign([tp])
                  committed = consumer.committed(tp)
                  consumer.seek_to_end(tp)
                  last_offset = consumer.position(tp)
                  if last_offset != None and committed != None:
                    lag = last_offset - committed
                    print "group: {} topic:{} partition: {} lag: {}".format(group[0], topic, p, lag)

                consumer.close(autocommit=False)

这篇关于2019年8月-Kafka消费者滞后编程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆