异步自动提交偏移量失败 [英] Asynchronous auto-commit of offsets fails
问题描述
我对Kafka自动提交机制有疑问. 我正在使用启用了自动提交的Spring-Kafka. 作为一个实验,我在系统空闲时断开了消费者与Kafka的连接30秒钟(该主题中没有新消息,没有正在处理的消息). 重新连接后,我收到了一些如下消息:
I have a question on Kafka auto-commit mechanism. I'm using Spring-Kafka with auto-commit enabled. As an experiment, I disconnected my consumer's connection to Kafka for 30 seconds while the system was idle (no new messages in the topic, no messages being processed). After reconnecting I got a few messages like so:
Asynchronous auto-commit of offsets {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
首先,我不知道要提交的内容是什么?系统处于空闲状态(先前的所有消息均已提交). 其次,断开时间为30秒,比最长轮询间隔5分钟(300000毫秒)要短得多 第三,在卡夫卡(Kafka)失控的故障中,我至少收到了3万条此类消息,可通过重新启动该过程来解决.为什么会这样?
First, I don't understand what is there to commit? The system was idle (all previous messages were already committed). Second, the disconnection time was 30 seconds, much less than the 5 minutes (300000 ms) max.poll.interval.ms Third, in an uncontrolled failure of Kafka I got at least 30K messages of this type, which was resolved by restarting the process. Why is this happening?
我在这里列出我的使用者配置:
I'm listing here my consumer configuration:
allow.auto.create.topics = true
auto.commit.interval.ms = 100
auto.offset.reset = latest
bootstrap.servers = [kafka1-eu.dev.com:9094, kafka2-eu.dev.com:9094, kafka3-eu.dev.com:9094]
check.crcs = true
client.dns.lookup = default
client.id =
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = feature-cs-1915-2553221872080030
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = SSL
send.buffer.bytes = 131072
session.timeout.ms = 15000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = [hidden]
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = /home/me/feature-2553221872080030.keystore
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = /home/me/feature-2553221872080030.truststore
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
推荐答案
首先,我不知道要提交什么?
First, I don't understand what is there to commit?
您是对的,如果没有新数据在流动,则没有新要提交的内容.但是,启用了auto.commit并且您的使用者仍在运行(即使无法连接到代理),轮询方法仍然负责以下步骤:
You are right, there is nothing new to commit if no new data is flowing. However, having auto.commit enabled and your consumer is still running (even without being able to connect to broker) the poll method is still responsible of the following steps:
- 从分配的分区中获取消息
- 触发分区分配(如有必要)
- 如果启用了自动偏移提交,则提交偏移
与100ms的间隔(请参见auto.commit.intervals
)一起,使用者仍然尝试异步提交使用者的(不变)偏移位置.
Together with your interval of 100ms (see auto.commit.intervals
) the consumer still tries to asynchronously commit the (non changing) offset position of the consumer.
第二,断开时间为30秒,比轮询时间间隔最大值5分钟(300000毫秒)要短得多
Second, the disconnection time was 30 seconds, much less than the 5 minutes (300000 ms) max.poll.interval.ms
不是最大轮询间隔导致重新平衡,而是heartbeat.interval.ms
设置和session.timeout.ms
的组合.您的使用者根据间隔设置(在您的情况下为3秒)发送后台线程心跳.如果在此会话超时(在您的情况下为15秒)到期之前,经纪人未收到任何心跳信号,那么经纪人将从该组中删除此客户端并启动重新平衡.
It is not the max.poll.interval that is causing the rebalance but rather the combination of your heartbeat.interval.ms
setting and the session.timeout.ms
. Your consumer sends in a background thread heartbeats based on the interval setting, in your case 3 seconds. If no heartbeats are received by the broker before the expiration of this session timeout (in your case 15 seconds), then the broker will remove this client from the group and initiate a rebalance.
我在消费者配置
第三,在无法控制的Kafka故障中,我至少收到了3万条此类消息,通过重新启动该过程即可解决.为什么会这样?
Third, in an uncontrolled failure of Kafka I got at least 30K messages of this type, which was resolved by restarting the process. Why is this happening?
这似乎是前两个问题的结合,在这里无法发送心跳,而消费者仍然试图通过连续调用的轮询方法进行提交.
That seems to be a combination of the first two questions, where heartbeats cannot be sent and still the consumer is trying to commit through the contiuously called poll method.
正如@GaryRussell在他的评论中提到的那样,我会谨慎地使用auto.commit.enabled
,而是要自己控制偏移管理.
As @GaryRussell mentioned in his comment, I would be careful to use auto.commit.enabled
and rather take the control over the Offset Management to yourself.
这篇关于异步自动提交偏移量失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!