Kafka CommitFailedException 消费者异常 [英] Kafka CommitFailedException consumer exception

查看:55
本文介绍了Kafka CommitFailedException 消费者异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在创建多个消费者(使用 Kafka 0.9 java API)并启动每个线程后,我收到以下异常

After create multiple consumers (using Kafka 0.9 java API) and each thread started, I'm getting the following exception

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905)

然后开始正常消费消息,我想知道是什么导致了这个异常以便修复它.

and then start consuming message normally, I would like to know what is causing this exception in order to fix it.

推荐答案

同时尝试调整以下参数:

Try also to tweak the following parameters:

  • heartbeat.interval.ms - 这告诉 Kafka 在它认为消费者将被视为死亡"之前等待指定的毫秒数
  • ma​​x.partition.fetch.bytes - 这将限制消费者在轮询时接收的消息数量(最多).
  • heartbeat.interval.ms - This tells Kafka wait the specified amount of milliseconds before it consider the consumer will be considered "dead"
  • max.partition.fetch.bytes - This will limit the amount of messages (up to) the consumer will receive when polling.

我注意到,如果消费者在心跳超时之前没有提交给 Kafka,就会发生重新平衡.如果在处理消息后发生提交,则处理它们的时间量将决定这些参数.因此,减少消息数量并增加心跳时间将有助于避免重新平衡.

I noticed that the rebalancing occurs if the consumer does not commit to Kafka before the heartbeat times out. If the commit occurs after the messages are processed, the amount of time to process them will determine these parameters. So, decreasing the number of messages and increasing the heartbeat time will help to avoid rebalancing.

还要考虑使用更多分区,这样即使每次轮询的消息较少,也会有更多线程处理您的数据.

Also consider to use more partitions, so there will be more threads processing your data, even with less messages per poll.

我编写了这个小应用程序来进行测试.希望有帮助.

I wrote this small application to make tests. Hope it helps.

https://github.com/ajkret/kafka-sample

更新

Kafka 0.10.x 现在提供了一个新参数来控制接收到的消息数量:- ma​​x.poll.records - 一次调用 poll() 时返回的最大记录数.

Kafka 0.10.x now offers a new parameter to control the number of messages received: - max.poll.records - The maximum number of records returned in a single call to poll().

更新

Kafka 提供了一种暂停队列的方法.当队列暂停时,您可以在单独的线程中处理消息,允许您调用 KafkaConsumer.poll() 来发送心跳.然后在处理完成后调用 KafkaConsumer.resume().通过这种方式,您可以减轻由于不发送心跳而导致重新平衡的问题.以下是您可以执行的操作的概述:

Kafka offers a way to pause the queue. While the queue is paused, you can process the messages in a separated Thread, allowing you to call KafkaConsumer.poll() to send heartbeats. Then call KafkaConsumer.resume() after the processing is done. This way you mitigate the problems of causing rebalances due to not sending heartbeats. Here is an outline of what you can do :

while(true) {
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE);
    consumer.commitSync();

    consumer.pause();
    for(ConsumerRecord record: records) {

        Future<Boolean> future = workers.submit(() -> {
            // Process
            return true;
        }); 


       while (true) {
            try {
                if (future.get(1, TimeUnit.SECONDS) != null) {
                    break;
                }
            } catch (java.util.concurrent.TimeoutException e) {
                getConsumer().poll(0);
            }
        }
    }

    consumer.resume();
}

这篇关于Kafka CommitFailedException 消费者异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆