卡夫卡消费者群体ID与消费者再平衡问题 [英] Kafka Consumer Group Id and consumer rebalance issue
本文介绍了卡夫卡消费者群体ID与消费者再平衡问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
0.10.0
和ZooKeeper3.4.6
。我有20个主题,每个主题大约有50个分区。我总共有100个用户,每个用户都订阅了不同的主题和分区。所有用户都有相同的groupID。那么,如果为特定主题添加或删除消费者,那么附加到不同主题的消费者也将经历重新平衡吗?
我的消费者代码是:
public static void main(String[] args) {
String groupId = "prod"
String topicRegex = args[0]
String consumerTimeOut = "10000"
int n_threads = 1
if (args && args.size() > 1) {
ConfigLoader.init(args[1])
}
else {
ConfigLoader.init('development')
}
if(args && args.size() > 2 && args[2].isInteger()){
n_threads = (args[2]).toInteger()
}
ExecutorService executor = Executors.newFixedThreadPool(n_threads)
addShutdownHook(executor)
String zooKeeper = ConfigLoader.conf.zookeeper.hostName
List<Runnable> taskList = []
for(int i = 0; i < n_threads; i++){
KafkaConsumer example = new KafkaConsumer(zooKeeper, groupId, topicRegex, consumerTimeOut)
taskList.add(example)
}
taskList.each{ task ->
executor.submit(task)
}
executor.shutdown()
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS)
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId, String consumerTimeOut) {
Properties props = new Properties()
props.put("zookeeper.connect", a_zookeeper)
props.put("group.id", a_groupId)
props.put("zookeeper.session.timeout.ms", "10000")
props.put("rebalance.backoff.ms","10000")
props.put("zookeeper.sync.time.ms","200")
props.put("rebalance.max.retries","10")
props.put("enable.auto.commit", "false")
props.put("consumer.timeout.ms", consumerTimeOut)
props.put("auto.offset.reset", "smallest")
return new ConsumerConfig(props)
}
public void run(String topicRegex) {
String threadName = Thread.currentThread().getName()
logger.info("{} [{}] main Starting", TAG, threadName)
Map<String, Integer> topicCountMap = new HashMap<String, Integer>()
List<KafkaStream<byte[], byte[]>> streams = consumer.createMessageStreamsByFilter(new Whitelist(topicRegex),1)
ConsumerConnector consumerConnector = consumer
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> consumerIterator = stream.iterator()
List<Object> batchTypeObjList = []
String topic
String topicObjectType
String method
String className
String deserialzer
Integer batchSize = 200
while (true){
boolean hasNext = false
try {
hasNext = consumerIterator.hasNext()
} catch (InterruptedException interruptedException) {
//if (exception instanceof InterruptedException) {
logger.error("{} [{}]Interrupted Exception: {}", TAG, threadName, interruptedException.getMessage())
throw interruptedException
//} else {
} catch(ConsumerTimeoutException timeoutException){
logger.error("{} [{}] Timeout Exception: {}", TAG, threadName, timeoutException.getMessage())
topicListMap.each{ eachTopic, value ->
batchTypeObjList = topicListMap.get(eachTopic)
if(batchTypeObjList != null && !batchTypeObjList.isEmpty()) {
def dbObject = topicConfigMap.get(eachTopic)
logger.debug("{} [{}] Timeout Happened.. Indexing remaining objects in list for topic: {}", TAG, threadName, eachTopic)
className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
int sleepTime = 0
if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null)
sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger()
executeMethod(className, method, batchTypeObjList)
batchTypeObjList.clear()
topicListMap.put(eachTopic,batchTypeObjList)
sleep(sleepTime)
}
}
consumer.commitOffsets()
continue
} catch(Exception exception){
logger.error("{} [{}]Exception: {}", TAG, threadName, exception.getMessage())
throw exception
}
if(hasNext) {
def consumerObj = consumerIterator.next()
logger.debug("{} [{}] partition name: {}", TAG, threadName, consumerObj.partition())
topic = consumerObj.topic()
DBObject dbObject = topicConfigMap.get(topic)
logger.debug("{} [{}] topic name: {}", TAG, threadName, topic)
topicObjectType = dbObject.get(KafkaTopicConfigEntity.TOPIC_OBJECT_TYPE_KEY)
deserialzer = KafkaConfig.DEFAULT_DESERIALIZER
if(KafkaConfig.DESERIALIZER_MAP.containsKey(topicObjectType)){
deserialzer = KafkaConfig.DESERIALIZER_MAP.get(topicObjectType)
}
className = dbObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
method = dbObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
boolean isBatchJob = dbObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
if(dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY) != null)
batchSize = dbObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY)
else
batchSize = 1
Object queueObj = (Class.forName(deserialzer)).deserialize(consumerObj.message())
int sleepTime = 0
if(dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS) != null)
sleepTime = dbObject.get(KafkaTopicConfigEntity.CONUSMER_SLEEP_IN_MS)?.toInteger()
if(isBatchJob == true){
batchTypeObjList = topicListMap.get(topic)
batchTypeObjList.add(queueObj)
if(batchTypeObjList.size() == batchSize) {
executeMethod(className, method, batchTypeObjList)
batchTypeObjList.clear()
sleep(sleepTime)
}
topicListMap.put(topic,batchTypeObjList)
} else {
executeMethod(className, method, queueObj)
sleep(sleepTime)
}
consumer.commitOffsets()
}
}
logger.debug("{} [{}] Shutting Down Process ", TAG, threadName)
}
}
任何帮助都将获得批准。
推荐答案
每当消费者离开或加入消费者组时,整个组都会经历重新平衡。由于该组跟踪其成员订阅的所有主题的所有分区,因此您的想法是正确的,即这可能会导致未订阅相关主题的消费者重新平衡。
请参见下面的小测试来说明这一点,我有一个代理,它有两个主题Test1(2个分区)和Test2(9个分区),并且我启动了两个使用者,它们都具有相同的使用者组,每个使用者只订阅两个主题中的一个。如您所见,当Consumer 2加入该组时,Consumer 1会撤销并重新分配所有分区,因为整个组都会重新平衡。Subscribing consumer1 to topic test1
Starting thread for consumer1
Polling consumer1
consumer1 got 0 partitions revoked!
consumer1 got 2 partitions assigned!
Polling consumer1
Polling consumer1
Polling consumer1
Subscribing consumer2 to topic test2
Starting thread for consumer2
Polling consumer2
Polling consumer1
consumer2 got 0 partitions revoked!
Polling consumer1
Polling consumer1
consumer1 got 2 partitions revoked!
consumer2 got 9 partitions assigned!
consumer1 got 2 partitions assigned!
Polling consumer2
Polling consumer1
Polling consumer2
Polling consumer1
Polling consumer2
这篇关于卡夫卡消费者群体ID与消费者再平衡问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文