Kafka CommitFailedException消费者异常 [英] Kafka CommitFailedException consumer exception

查看:163
本文介绍了Kafka CommitFailedException消费者异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在创建多个使用者(使用Kafka 0.9 java API)并且每个线程启动后,我收到以下异常

After create multiple consumers (using Kafka 0.9 java API) and each thread started, I'm getting the following exception

Consumer has failed with exception: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
class com.messagehub.consumer.Consumer is shutting down.
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:546)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:487)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:681)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:654)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:350)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:288)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:303)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:187)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:157)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:352)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:905)

然后开始consumin通常g消息,我想知道导致此异常的原因是什么。

and then start consuming message normally, I would like to know what is causing this exception in order to fix it.

推荐答案

还要尝试调整以下参数:

Try also to tweak the following parameters:


  • heartbeat.interval.ms - 这告诉Kafka在考虑之前等待指定的毫秒数消费者将被视为死亡

  • max.partition.fetch.bytes - 这将限制消费者在消费者收到的消息量(最多) polling。

  • heartbeat.interval.ms - This tells Kafka wait the specified amount of milliseconds before it consider the consumer will be considered "dead"
  • max.partition.fetch.bytes - This will limit the amount of messages (up to) the consumer will receive when polling.

我注意到,如果消费者在心跳超时之前没有提交Kafka,则会发生重新平衡。如果在处理消息后发生提交,则处理它们的时间将决定这些参数。因此,减少消息数量和增加心跳时间将有助于避免重新平衡。

I noticed that the rebalancing occurs if the consumer does not commit to Kafka before the heartbeat times out. If the commit occurs after the messages are processed, the amount of time to process them will determine these parameters. So, decreasing the number of messages and increasing the heartbeat time will help to avoid rebalancing.

还考虑使用更多分区,因此将有更多线程处理您的数据,即使每次投票的消息较少。

Also consider to use more partitions, so there will be more threads processing your data, even with less messages per poll.

我写了这个小应用程序来进行测试。希望它有所帮助。

I wrote this small application to make tests. Hope it helps.

https:// github。 com / ajkret / kafka-sample

更新

Kafka 0.10.x现在提供了一个新参数来控制收到的消息数量:
- max.poll.records - 一次调用poll()时返回的最大记录数。

Kafka 0.10.x now offers a new parameter to control the number of messages received: - max.poll.records - The maximum number of records returned in a single call to poll().

更新

Kafka提供了一种暂停的方式队列。当队列暂停时,您可以在单独的线程中处理消息,允许您调用 KafkaConsumer.poll()来发送心跳。然后在处理完成后调用 KafkaConsumer.resume()。这样可以缓解由于未发送心跳而导致导致重新平衡的问题。以下是您可以做的概述:

Kafka offers a way to pause the queue. While the queue is paused, you can process the messages in a separated Thread, allowing you to call KafkaConsumer.poll() to send heartbeats. Then call KafkaConsumer.resume() after the processing is done. This way you mitigate the problems of causing rebalances due to not sending heartbeats. Here is an outline of what you can do :

while(true) {
    ConsumerRecords records = consumer.poll(Integer.MAX_VALUE);
    consumer.commitSync();

    consumer.pause();
    for(ConsumerRecord record: records) {

        Future<Boolean> future = workers.submit(() -> {
            // Process
            return true;
        }); 


       while (true) {
            try {
                if (future.get(1, TimeUnit.SECONDS) != null) {
                    break;
                }
            } catch (java.util.concurrent.TimeoutException e) {
                getConsumer().poll(0);
            }
        }
    }

    consumer.resume();
}

这篇关于Kafka CommitFailedException消费者异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆