卡夫卡消费者群体ID与消费者再平衡问题 [英] Kafka Consumer Group Id and consumer rebalance issue

查看:0
本文介绍了卡夫卡消费者群体ID与消费者再平衡问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在我的生产服务器中使用Kafka0.10.0和ZooKeeper3.4.6。我有20个主题,每个主题大约有50个分区。我总共有100个用户,每个用户都订阅了不同的主题和分区。所有用户都有相同的groupID。那么,如果为特定主题添加或删除消费者,那么附加到不同主题的消费者也将经历重新平衡吗?

我的消费者代码是:

public static void main(String[] args) {
        String groupId = "prod"
        String topicRegex = args[0]
        String consumerTimeOut = "10000"
        int n_threads = 1
        if (args && args.size() > 1) {
            ConfigLoader.init(args[1])
        }
        else {
            ConfigLoader.init('development')
        }
        if(args && args.size() > 2 && args[2].isInteger()){
            n_threads = (args[2]).toInteger()
        }

        ExecutorService executor = Executors.newFixedThreadPool(n_threads)
        addShutdownHook(executor)
        String zooKeeper = ConfigLoader.conf.zookeeper.hostName
        List<Runnable> taskList = []
        for(int i = 0; i < n_threads; i++){
            KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topicRegex, consumerTimeOut)
            taskList.add(example)
        }
        taskList.each{ task ->
            executor.submit(task)
        }
        executor.shutdown()
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
    }

private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId, String consumerTimeOut) {

        Properties props = new Properties()
        props.put("zookeeper.connect", a_zookeeper)
        props.put("group.id", a_groupId)
        props.put("zookeeper.session.timeout.ms", "10000")
        props.put("rebalance.backoff.ms","10000")
        props.put("zookeeper.sync.time.ms","200")
        props.put("rebalance.max.retries","10")
        props.put("enable.auto.commit", "false")
        props.put("consumer.timeout.ms", consumerTimeOut)
        props.put("auto.offset.reset", "smallest")
        return new ConsumerConfig(props)

    }

public void run(String topicRegex) {
        String threadName = Thread.currentThread().getName()
        logger.info("{} [{}] main Starting", TAG, threadName)
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>()
        List<KafkaStream<byte[], byte[]>> streams = consumer.createMessageStreamsByFilter(new Whitelist(topicRegex),1)
        ConsumerConnector consumerConnector = consumer

        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator()
            List<Object> batchTypeObjList = []
            String topic
            String topicObjectType
            String method
            String className
            String deserialzer
            Integer batchSize = 200
            while (true){
                boolean hasNext = false
                try {
                    hasNext = consumerIterator.hasNext()
                } catch (InterruptedException interruptedException) {
                    //if (exception instanceof InterruptedException) {
                    logger.error("{} [{}]Interrupted Exception: {}", TAG, threadName, interruptedException.getMessage())
                    throw interruptedException
                    //} else {
                } catch(ConsumerTimeoutException timeoutException){
                    logger.error("{} [{}] Timeout Exception: {}", TAG, threadName, timeoutException.getMessage())
                    topicListMap.each{ eachTopic, value ->
                        batchTypeObjList = topicListMap.get(eachTopic)
                        if(batchTypeObjList != null && !batchTypeObjList.isEmpty()) {
                            def dbObject = topicConfigMap.get(eachTopic)
                            logger.debug("{} [{}] Timeout Happened.. Indexing remaining objects in list for topic: {}", TAG, threadName, eachTopic)
                            className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                            method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                            int sleepTime = 0
                            if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null)
                                sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger()
                            executeMethod(className, method, batchTypeObjList)
                            batchTypeObjList.clear()
                            topicListMap.put(eachTopic,batchTypeObjList)
                            sleep(sleepTime)
                        }
                    }
                    consumer.commitOffsets()
                    continue
                } catch(Exception exception){
                    logger.error("{} [{}]Exception: {}", TAG, threadName, exception.getMessage())
                    throw exception
                }
                if(hasNext) {
                    def consumerObj = consumerIterator.next()
                    logger.debug("{} [{}] partition name: {}", TAG, threadName, consumerObj.partition())
                    topic = consumerObj.topic()
                    DBObject dbObject = topicConfigMap.get(topic)
                    logger.debug("{} [{}] topic name: {}", TAG, threadName, topic)
                    topicObjectType = dbObject.get(KafkaTopicConfigEntity.TOPIC_OBJECT_TYPE_KEY)
                    deserialzer = KafkaConfig.DEFAULT_DESERIALIZER
                    if(KafkaConfig.DESERIALIZER_MAP.containsKey(topicObjectType)){
                        deserialzer = KafkaConfig.DESERIALIZER_MAP.get(topicObjectType)
                    }
                    className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                    method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                    boolean isBatchJob = dbObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                    if(dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY) != null)
                        batchSize = dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY)
                    else
                        batchSize = 1
                    Object queueObj = (Class.forName(deserialzer)).deserialize(consumerObj.message())
                    int sleepTime = 0
                    if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null)
                        sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger()
                    if(isBatchJob == true){
                        batchTypeObjList = topicListMap.get(topic)
                        batchTypeObjList.add(queueObj)
                        if(batchTypeObjList.size() == batchSize) {
                            executeMethod(className, method, batchTypeObjList)
                            batchTypeObjList.clear()
                            sleep(sleepTime)
                        }
                        topicListMap.put(topic,batchTypeObjList)
                    } else {
                        executeMethod(className, method, queueObj)
                        sleep(sleepTime)
                    }
                    consumer.commitOffsets()
                }
            }
            logger.debug("{} [{}] Shutting Down Process ", TAG, threadName)
        }
    }

任何帮助都将获得批准。

推荐答案

每当消费者离开或加入消费者组时,整个组都会经历重新平衡。由于该组跟踪其成员订阅的所有主题的所有分区,因此您的想法是正确的,即这可能会导致未订阅相关主题的消费者重新平衡。

请参见下面的小测试来说明这一点,我有一个代理,它有两个主题Test1(2个分区)和Test2(9个分区),并且我启动了两个使用者,它们都具有相同的使用者组,每个使用者只订阅两个主题中的一个。如您所见,当Consumer 2加入该组时,Consumer 1会撤销并重新分配所有分区,因为整个组都会重新平衡。

Subscribing consumer1 to topic test1
Starting thread for consumer1
Polling consumer1
consumer1 got 0 partitions revoked!
consumer1 got 2 partitions assigned!
Polling consumer1
Polling consumer1
Polling consumer1
Subscribing consumer2 to topic test2
Starting thread for consumer2
Polling consumer2
Polling consumer1
consumer2 got 0 partitions revoked!
Polling consumer1
Polling consumer1
consumer1 got 2 partitions revoked!
consumer2 got 9 partitions assigned!
consumer1 got 2 partitions assigned!
Polling consumer2
Polling consumer1
Polling consumer2
Polling consumer1
Polling consumer2

这篇关于卡夫卡消费者群体ID与消费者再平衡问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆