CommitFailedException由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交 [英] CommitFailedException Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member

查看:54
本文介绍了CommitFailedException由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用kafka 0.10.2,现在遇到了CommitFailedException.像:

I was using kafka 0.10.2 and now faced a CommitFailedException. like:

无法完成提交,因为该组已经重新平衡并且将分区分配给另一个成员.这意味着时间后续对poll()的调用之间的时间比配置的时间长max.poll.interval.ms,通常表示轮询循环为花太多时间处理邮件.您可以解决这个问题通过增加会话超时或减小最大大小在poll()中返回的批次具有max.poll.records.

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

我已将max.poll.interval.ms设置为Integer.MAX_VALUE.因此,有人可以告诉我为什么即使设置了此值仍然会发生这种情况?

I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?

另一个问题是:我做为将session.timeout.ms设置为60000的描述,但仍然会发生.我尝试用简单的代码重制

Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

当我将session.timeout.ms设置为10000时,我尝试在我的轮询循环中休眠10000毫秒以上,但似乎可以正常工作,并且没有异常.所以我对此感到困惑.如果心跳是由consumer.poll和consumer.commit触发的,则在我的代码中心跳似乎超出了会话超时.为什么不抛出CommitFailedException?

when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumer.commit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?

推荐答案

session.timeout.ms 应该小于 group.max.session.timeout.ms在Kafka经纪人上设置.

session.timeout.ms set on the consumer should be less than the group.max.session.timeout.ms set on Kafka broker.

这为我解决了这个问题.

This resolved the issue for me.

贷记到github链接提交失败

Credit to github link Commit Failures

这篇关于CommitFailedException由于该组已经重新平衡并将分区分配给另一个成员,因此无法完成提交的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆