卡夫卡消费者,很长的重新平衡 [英] Kafka consumer, very long rebalances

查看:25
本文介绍了卡夫卡消费者,很长的重新平衡的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在运行 3 个代理 Kafka 0.10.0.1 集群.我们有一个 Java 应用程序,它产生许多从不同主题消费的消费者线程.对于每个主题,我们都指定了不同的消费者组.

We are running a 3 broker Kafka 0.10.0.1 cluster. We have a java app which spawns many consumer threads consuming from different topics. For every topic we have specified different consumer-group.

很多时候我看到每当这个应用程序重新启动时,一个或多个 CG 需要超过 5 分钟才能接收分区分配.在那之前,该主题的消费者不会消费任何东西.如果我去 Kafka broker 并运行 consumer-groups.sh 并描述那个特定的 CG,我会看到它正在重新平衡.在 server.log 我看到这样的行

A lot of times I see that whenever this application is restarted one or more CGs take more than 5 minutes to receive partition assignment. Till that time consumers for that topic don't consume anything. If I go to Kafka broker and run consumer-groups.sh and describe that particular CG I see that it is rebalancing. In server.log I see such lines

准备稳定群 otp-sms-consumer稳定组otp-sms-consumer

Preparing to stabilize group otp-sms-consumer Stabilized group otp-sms-consumer

并且这两个日志之间通常有大约 5 分钟或更长时间的间隔.在消费者方面,当我打开跟踪级别日志时,在此暂停时间内实际上没有任何活动.几分钟后,许多活动开始了.该主题中存储了时间关键数据,例如 otp-sms,我们不能容忍如此长的延迟.如此长时间重新平衡的原因是什么.

And between these two logs there is usually a gap of about 5 minutes or more. On consumer side when I turn trace level logs, there is literally no activity during this pause time. After a couple of minutes a lot of activity starts. There is time critical data stored in that topic like otp-sms and we cannot tolerate such long delays. What can be the reason for such long rebalances.

这是我们的消费者配置

auto.commit.interval.ms = 3000
auto.offset.reset = latest
bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = otp-notifications-consumer
heartbeat.interval.ms = 3000
interceptor.classes = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 50
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 300000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /x/x/client.truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

请帮忙.

推荐答案

重新平衡超时等于 max.poll.interval.ms(在您的情况下为 5 分钟)当重新平衡在组中开始时,Kafka 撤销该组中的所有消费者.然后等待所有活着的消费者(发送心跳的消费者)到 poll() 并发送 JoinGroupRequest.

Rebalance timeout is equal to max.poll.interval.ms (5 minutes in your case) When rebalance starts in a group, Kafka revokes all the consumers in that group. Then waits for all alive consumers (consumers which send heartbeat) to poll() and send JoinGroupRequest.

这个等待过程将导致重新平衡超时或所有活着的消费者 poll() 和 Kafka 将分区分配给这些消费者.

This waiting process will end up with rebalance timeout or all the alive consumers poll() and Kafka assign partitions to these consumers.

因此,在您的情况下,您的一个消费者中可能有一个长时间运行的进程,而 Kafka 会等待此进程完成以分配分区.

So in your case you probably have a long running process in one of your consumers, and Kafka waits this process to complete to assign partitions.

有关更多信息,您可以查看这些:

For more information you can check these:

消费者群体是 Kafka 必不可少的机制.他们允许消费者通过动态分配来共享负载和弹性扩展主题对消费者的分区.在我们目前的模型中消费者群体,每当重新平衡发生时,每个消费者都来自组遇到停机时间 - 他们的 poll() 调用阻塞,直到每个组中的其他消费者调用 poll().这是因为每个消费者都需要在重新平衡场景中调用 JoinGroup为了确认它仍然在组中.

Consumer groups are an essential mechanism of Kafka. They allow consumers to share load and elastically scale by dynamically assigning the partitions of topics to consumers. In our current model of consumer groups, whenever a rebalance happens every consumer from that group experiences downtime - their poll() calls block until every other consumer in the group calls poll(). That is due to the fact that every consumer needs to call JoinGroup in a rebalance scenario in order to confirm it is still in the group.

今天,如果客户端将 max.poll.interval.ms 配置为一个大值,组协调员经纪人将接受无限数量的加入组​​请求,因此重新平衡可以继续无限的时间.(https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit)

Today, if the client has configured max.poll.interval.ms to a large value, the group coordinator broker will take in an unlimited number of join group requests and the rebalance could therefore continue for an unbounded amount of time. (https://cwiki.apache.org/confluence/display/KAFKA/KIP-389%3A+Introduce+a+configurable+consumer+group+size+limit)

-

因为我们给客户端尽可能多的 max.poll.interval.ms 来处理一个批量记录,这也是消费者可以使用的最长时间预计在最坏的情况下重新加入该组.我们因此建议将 Java 客户端中的重新平衡超时设置为相同使用 max.poll.interval.ms 配置的值.当重新平衡开始时,后台线程将继续发送心跳.消费者在处理完成并且用户之前不会重新加入组调用 poll().从协调者的角度来看,消费者将在 1) 他们的会话超时之前不会从组中删除未收到心跳即过期,或 2) 重新平衡超时过期.

Since we give the client as much as max.poll.interval.ms to handle a batch of records, this is also the maximum time before a consumer can be expected to rejoin the group in the worst case. We therefore propose to set the rebalance timeout in the Java client to the same value configured with max.poll.interval.ms. When a rebalance begins, the background thread will continue sending heartbeats. The consumer will not rejoin the group until processing completes and the user calls poll(). From the coordinator's perspective, the consumer will not be removed from the group until either 1) their session timeout expires without receiving a heartbeat, or 2) the rebalance timeout expires.

(https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread)

这篇关于卡夫卡消费者,很长的重新平衡的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆