卡夫卡消费者提交失败异常 [英] Kafka Consumer CommitFailedException

查看:329
本文介绍了卡夫卡消费者提交失败异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个 kafka 消费者程序.最近我们将其部署在 PROD 环境中.在那里,我们遇到了如下问题:

[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - 获取的记录数:1[kafka-coordinator-heartbeat-thread |otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] 组协调员 opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) 不可用或无效,将尝试重新发现[kafka-coordinator-heartbeat-thread |otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] 发现组协调员 opl-kafka-prd2-01:9092(id:2147483644 机架:空)[kafka-coordinator-heartbeat-thread |otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] 自成员以来尝试心跳失败id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b 无效.[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - 批量起始偏移量:9329428[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - 批处理成功.[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] OffsetCommit 请求失败,因为消费者不属于一个活跃的群体线程main"中的异常org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为组已经重新平衡并将分区分配给另一个成员.这意味着对 poll() 的后续调用之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息.您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 中返回的批次的最大大小来解决此问题.在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:936)在 org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1387)在 org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1349)在 com.cisco.kafka.consumer.RTRKafkaConsumer.main(RTRKafkaConsumer.java:72)

我的理解是当组协调器不可用并被重新发现时,心跳间隔(根据文档为 3 秒)到期并且消费者被踢出组.这样对吗?.如果是这样,应该如何解决这个问题?如果我错了,请帮助我理解这个问题,并提出你必须解决这个问题的任何想法.如果需要,我可以分享代码.

解决方案

您所指的异常

线程main"中的异常org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为组已经重新平衡并将分区分配给另一个成员.这意味着对 poll() 的后续调用之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间处理消息.您可以通过增加 max.poll.interval.ms 或通过使用 max.poll.records 减少 poll() 中返回的批次的最大大小来解决此问题.

提示正在发生的事情以及可以采取哪些措施来解决问题.在 代码 这个异常被描述为

<块引用>

"当使用 KafkaConsumer#commitSync() 的偏移提交失败并出现不可恢复的错误时,会引发此异常.当在成功应用提交之前完成组重新平衡时,可能会发生这种情况.在这种情况下,通常无法重试提交,因为某些分区可能已经分配给组中的另一个成员."

根据我的经验,抛出的错误消息可能是由不同的原因引起的,尽管它们都与不再分配给分区的使用者有关:

  1. 在不关闭消费者的情况下创造越来越多的消费者
  2. 投票超时
  3. 心跳超时
  4. 过时的 Kerberos 票

1.在不关闭的情况下打开越来越多的消费者

如果您将消费者添加到现有 ConsumerGroup,则会发生重新平衡.因此,必须在使用后关闭消费者或始终使用相同的实例,而不是为每个消息/迭代创建新的 KafkaConsumer 对象.

2.轮询超时(如错误消息中所述):

<块引用>

[...] 对 poll() 的后续调用之间的时间比配置的 max.poll.interval.ms 长,这通常意味着轮询循环花费了太多时间消息处理.

配置ma​​x.poll.interval.ms默认为300000ms5minutes.由于您的消费者花费的时间超过这 5 分钟,消费者被视为失败,组将重新平衡以将分区重新分配给另一个成员(请参阅 消费者配置).

轮询超时的解决方案:

错误信息中也给出了可能的解决方案

<块引用>

您可以通过增加 max.poll.interval.ms 或使用 max.poll.records 减少 poll() 中返回的批次的最大大小来解决此问题.

消费者再次读取所有消息,因为(如错误所示)它无法提交偏移量.这意味着,如果您使用相同的 group.id 启动 Consumer,它会认为它从未从该主题中读取任何内容.

3.心跳超时

KafkaConsumer 中有两个主要配置处理心跳:heartbeat.interval.mssession.timeout.ms.

在单独的后台线程中,您的 KafkaConsumer 会定期向服务器发送心跳.如果消费者在 session.timeout.ms 时间内崩溃或无法发送心跳,则消费者将被视为死亡,其分区将被重新分配.如果触发重新平衡,您的消费者将无法从旧分配"提交任何内容.分区,正如在 CommitFailedException 的描述中所写的那样:当在成功应用提交之前组重新平衡完成时,可能会发生这种情况."

心跳超时的解决方案:

在遵循建议的同时增加设置 heartbeat.interval.mssession.timeout.ms:heartbeat.interval.ms 必须设置为低于 session.timeout.ms,但通常应设置为不高于该值的 1/3.">

请记住,更改这些值总是需要权衡取舍.你有一个

  • 更频繁的重新平衡但更短的反应时间来识别死消费者或
  • 减少重新平衡的频率和更长的反应时间来识别死消费者.

4.过时的 Kerberos 票证

在我们的生产集群上,我们看到了 CommitFailedException 在应用程序无法续订 Kerberos 票证之后.

I am working on a kafka consumer program. Recently we deployed it in PROD environment. There we faced an issue as follows:

[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - No. of records fetched: 1
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Discovered group coordinator opl-kafka-prd2-01:9092 (id: 2147483644 rack: null)
[kafka-coordinator-heartbeat-thread | otm-opl-group] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Attempt to heartbeat failed for since member id consumer-otm-opl-group-1-953dfa46-9ced-472f-b24f-36d78c6b940b is not valid.
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch start offset: 9329428
[main] INFO com.cisco.kafka.consumer.RTRKafkaConsumer - Batch Processing Successful.
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-otm-opl-group-1, groupId=otm-opl-group] Failing OffsetCommit request since the consumer is not part of an active group
Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1061)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:936)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1387)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1349)
    at com.cisco.kafka.consumer.RTRKafkaConsumer.main(RTRKafkaConsumer.java:72)

My understanding is by the time group coordinator is unavailable and re-discovered, the heartbeat interval (3 seconds as per documentation) expires and consumer is kicked out of the group. Is this correct?. If so what should be the work around for this?. If I'm wrong, please help me in understanding this issue and suggest any ideas you have to fix this issue. I can share the code if needed.

解决方案

The Exception you are referring to

Exception in thread "main" org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

gives a hint on what is happening and what can be done to solve the problem. In the code this Exception is described as

"This exception is raised when an offset commit with KafkaConsumer#commitSync() fails with an unrecoverable error. This can happen when a group rebalance completes before the commit could be successfully applied. In this case, the commit cannot generally be retried because some of the partitions may have already been assigned to another member in the group."

In my experience the thrown error message can be caused by different things although they are all related to the consumer not being assigned to the partition anymore:

  1. Creating more and more Consumers without closing them
  2. Timeout of poll
  3. Timeout of heartbeat
  4. Outdated Kerberos ticket

1. Opening more and more Consumers without closing them

A rebalance takes place if you add a consumer to an existing ConsumerGroup. Therefore, it is essential to close the consumer after usage or to always use the same instance instead of creating new KafkaConsumer object for every message/iteration.

2. Poll Timeout (as explained in the Error message):

[...] that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing.

The configuration max.poll.interval.ms defaults to 300000ms or 5minutes. As your consumer is taking more than those 5 minutes, the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member (see Consumer Configuration).

Solution for Poll Timeout:

A possible solution is also given in the error message

You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

The consumer reads all messages again, because (as the error shows) it is not able to commit the offsets. That means, if you start the Consumer with the same group.id it think that it never read anything from that topic.

3. Timeout of Heartbeat

There are two main configuration in your KafkaConsumer that deal with the heartbeats: heartbeat.interval.ms and session.timeout.ms.

In a seperate background thread your KafkaConsumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned. If the rebalance is triggered your consumer can't commit anything from an "old assigned" partition as it is written in the description of the CommitFailedException: "This can happen when a group rebalance completes before the commit could be successfully applied."

Solution for Heartbeat Timeout:

Increase the settings heartbeat.interval.ms and session.timeout.ms while following the recommendation: " The heartbeat.interval.ms must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value."

Just keep in mind that changing these values always comes with a trade-off. You have either

  • more frequent rebalances but shorter reaction time to identify dead consumers or
  • less frequent rebalances and longer reaction time to identify dead consumers.

4. Outdated Kerberos ticket

On our production cluster we have seen the CommitFailedException just after the application was not able to renew the Kerberos ticket.

这篇关于卡夫卡消费者提交失败异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆