偏移量的异步自动提交失败 [英] Asynchronous auto-commit of offsets fails

查看:34
本文介绍了偏移量的异步自动提交失败的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个关于 Kafka 自动提交机制的问题.我正在使用启用了自动提交的 Spring-Kafka.作为一个实验,我在系统空闲时断开了消费者与 Kafka 的连接 30 秒(主题中没有新消息,没有消息正在处理).重新连接后,我收到了一些消息,如下所示:

I have a question on Kafka auto-commit mechanism. I'm using Spring-Kafka with auto-commit enabled. As an experiment, I disconnected my consumer's connection to Kafka for 30 seconds while the system was idle (no new messages in the topic, no messages being processed). After reconnecting I got a few messages like so:

Asynchronous auto-commit of offsets {cs-1915-2553221872080030-0=OffsetAndMetadata{offset=19, leaderEpoch=0, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

首先,我不明白要提交什么?系统空闲(所有先前的消息都已提交).二、断开时间为30秒,远小于5分钟(300000毫秒)max.poll.interval.ms第三,在 Kafka 失控的故障中,我收到了至少 30K 条此类消息,通过重新启动进程解决了这个问题.为什么会发生这种情况?

First, I don't understand what is there to commit? The system was idle (all previous messages were already committed). Second, the disconnection time was 30 seconds, much less than the 5 minutes (300000 ms) max.poll.interval.ms Third, in an uncontrolled failure of Kafka I got at least 30K messages of this type, which was resolved by restarting the process. Why is this happening?

我在这里列出了我的消费者配置:

I'm listing here my consumer configuration:

allow.auto.create.topics = true
        auto.commit.interval.ms = 100
        auto.offset.reset = latest
        bootstrap.servers = [kafka1-eu.dev.com:9094, kafka2-eu.dev.com:9094, kafka3-eu.dev.com:9094]
        check.crcs = true
        client.dns.lookup = default
        client.id =
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = feature-cs-1915-2553221872080030
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = SSL
        send.buffer.bytes = 131072
        session.timeout.ms = 15000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = /home/me/feature-2553221872080030.keystore
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = /home/me/feature-2553221872080030.truststore
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        value.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2

推荐答案

首先,我不明白要提交什么?

First, I don't understand what is there to commit?

您说得对,如果没有新数据流动,则没有任何可提交.但是,启用 auto.commit 并且您的使用者仍在运行(即使无法连接到代理),poll 方法仍然负责以下步骤:

You are right, there is nothing new to commit if no new data is flowing. However, having auto.commit enabled and your consumer is still running (even without being able to connect to broker) the poll method is still responsible of the following steps:

  • 从分配的分区获取消息
  • 触发分区分配(如有必要)
  • 如果启用自动偏移提交,则提交偏移

连同您的 100 毫秒间隔(参见 auto.commit.intervals),消费者仍然尝试异步提交消费者的(未更改的)偏移位置.

Together with your interval of 100ms (see auto.commit.intervals) the consumer still tries to asynchronously commit the (non changing) offset position of the consumer.

其次,断开时间为30秒,远小于5分钟(300000毫秒)max.poll.interval.ms

Second, the disconnection time was 30 seconds, much less than the 5 minutes (300000 ms) max.poll.interval.ms

导致重新平衡的不是 max.poll.interval 而是您的 heartbeat.interval.ms 设置和 session.timeout.ms 的组合.您的使用者根据间隔设置发送后台线程心跳,在您的情况下为 3 秒.如果在此会话超时到期(在您的情况下为 15 秒)之前代理未收到心跳,则代理将从组中删除此客户端并启动重新平衡.

It is not the max.poll.interval that is causing the rebalance but rather the combination of your heartbeat.interval.ms setting and the session.timeout.ms. Your consumer sends in a background thread heartbeats based on the interval setting, in your case 3 seconds. If no heartbeats are received by the broker before the expiration of this session timeout (in your case 15 seconds), then the broker will remove this client from the group and initiate a rebalance.

关于消费者配置的Kafka文档中给出了我提到的配置的更详细描述一个>

A more detailed description of the configuration I mentioned are given in the Kafka documentation on Consumer Configs

第三,在 Kafka 失控的故障中,我收到了至少 30K 条此类消息,通过重新启动进程解决了这个问题.为什么会发生这种情况?

Third, in an uncontrolled failure of Kafka I got at least 30K messages of this type, which was resolved by restarting the process. Why is this happening?

这似乎是前两个问题的组合,无法发送心跳,而消费者仍然试图通过连续调用的 poll 方法提交.

That seems to be a combination of the first two questions, where heartbeats cannot be sent and still the consumer is trying to commit through the contiuously called poll method.

正如@GaryRussell 在他的评论中提到的,我会小心使用 auto.commit.enabled 而不是自己控制偏移管理.

As @GaryRussell mentioned in his comment, I would be careful to use auto.commit.enabled and rather take the control over the Offset Management to yourself.

这篇关于偏移量的异步自动提交失败的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆