Kafka 消费者抛出 java.lang.OutOfMemoryError:直接缓冲内存 [英] Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memory

查看:387
本文介绍了Kafka 消费者抛出 java.lang.OutOfMemoryError:直接缓冲内存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用单节点 Kafka 代理 (0.10.2) 和单节点 zookeeper 代理 (3.4.9).我有一个消费者服务器(单核和 1.5 GB RAM).每当我运行具有 5 个或更多线程的进程时,我的消费者线程在抛出这些异常后都会被杀死

  1. 例外 1

<块引用>

java.lang.OutOfMemoryError:Java 堆空间在 java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)在 java.nio.ByteBuffer.allocate(ByteBuffer.java:335)在 org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93)在 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)在 org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)在 org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)在 org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)在 org.apache.kafka.common.network.Selector.poll(Selector.java:303)在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263)在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

  1. 例外 2

<块引用>

kafka-coordinator-heartbeat-thread 中未捕获的异常 |主题1:java.lang.OutOfMemoryError:直接缓冲内存在 java.nio.Bits.reserveMemory(Bits.java:693)在 java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)在 java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)在 sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)在 sun.nio.ch.IOUtil.read(IOUtil.java:195)在 sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)在 org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)在 org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)在 org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)在 org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)在 org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)在 org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)在 org.apache.kafka.common.network.Selector.poll(Selector.java:303)在 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263)在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

我在谷歌上搜索并使用了下面提到的 JVM 参数,但仍然发生了相同的异常

<块引用>

-XX:MaxDirectMemorySize=768m

-Xms512m

如何解决这个问题?是否需要其他任何 javm 参数调整?

我的Kafka消费者代码是

import com.mongodb.DBObject导入 org.apache.kafka.clients.consumer.ConsumerRebalanceListener导入 org.apache.kafka.clients.consumer.ConsumerRecord导入 org.apache.kafka.clients.consumer.ConsumerRecords导入 org.apache.kafka.clients.consumer.KafkaConsumer导入 org.apache.kafka.clients.consumer.OffsetAndMetadata导入 org.apache.kafka.clients.consumer.OffsetCommitCallback导入 org.apache.kafka.common.TopicPartition导入 org.apache.kafka.common.errors.InterruptException导入 org.apache.kafka.common.errors.WakeupException导入 org.slf4j.Logger导入 org.slf4j.LoggerFactory导入 java.util.regex.PatternKafkaPollingConsumer 类实现 Runnable {私有静态最终记录器记录器 = LoggerFactory.getLogger(KafkaPollingConsumer.class)private static final String TAG = "[KafkaPollingConsumer]"私有最终 KafkaConsumer卡夫卡消费者私有地图currentOffsetsMap = new HashMap<>()列出主题名称列表Map kafkaTopicConfigMap = new HashMap()Map kafkaTopicMessageListMap = new HashMap()布尔值 isRebalancingTriggered = false私人决赛长 REBALANCING_SLEEP_TIME = 1000public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex, Integer batchSize, Integer maxPollTime, Integer requestTime){logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)logger.debug("为 kafak 消费者填充属性")logger.debug("BatchSize {}",batchSize)属性 kafkaConsumerProperties = new Properties()kafkaConsumerProperties.put("group.id", groupName)kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumerv2.deserializer.CustomObjectDeserializer")开关(服务器类型){案例 KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)休息案例 KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)kafkaConsumerProperties.put("max.poll.records",1)kafkaConsumerProperties.put("max.poll.interval.ms",600000)kafkaConsumerProperties.put("request.timeout.ms",600005)休息默认 :抛出无效的服务器类型"休息}logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())kafkaConsumer = new KafkaConsumer(kafkaConsumerProperties)topicNameList = topicNameRegex.split(Pattern.quote('|'))logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())logger.debug("{} [Constructor] Exit",TAG)}私有类 HandleRebalance 实现 ConsumerRebalanceListener {public void onPartitionsAssigned(Collection partitions) {logger.error('{} 在 onPartitionAssigned 设置中 isRebalancingTriggered 为 false',TAG)isRebalancingTriggered = false}public void onPartitionsRevoked(Collection partitions) {logger.error("{} 在 onPartitionsRevoked 设置 osRebalancingTriggered 为 true",TAG)isRebalancingTriggered = truepublishAllKafkaTopicBatchMessages()提交偏移量()}}私有类 AsyncCommitCallBack 实现了 OffsetCommitCallback{@覆盖void onComplete(Map map, Exception e) {}}@覆盖无效运行(){logger.debug("{} 开始线程 ThreadName {}",TAG,Thread.currentThread().getName())populateKafkaConfigMap()初始化KafkaTopicMessageListMap()字符串主题名称字符串消费者类名字符串消费者方法名称布尔值 isBatchJob整数批次大小 = 0最终线程 mainThread = Thread.currentThread()Runtime.getRuntime().addShutdownHook(new Thread() {公共无效运行(){logger.error("{},优雅地关闭线程{}",TAG,mainThread.getName())kafkaConsumer.wakeup()尝试 {mainThread.join()} catch(InterruptedException异常){logger.error("{} 错误:{}",TAG,exception.getStackTrace().join("\n"))}}})kafkaConsumer.subscribe(topicNameList , new HandleRebalance())尝试{而(真){logger.debug("{} 以 100 毫秒为单位的轮询时间启动消费者",TAG)消费者记录卡夫卡记录if(isRebalancingTriggered == false) {kafkaRecords = kafkaConsumer.poll(100)}别的{logger.error("{} 在重新平衡进入睡眠状态",TAG)Thread.sleep(REBALANCING_SLEEP_TIME)继续}for(ConsumerRecord 记录:kafkaRecords){if(isRebalancingTriggered == true){休息}currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))主题名称 = record.topic()DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)消费者类名称 = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)消费者方法名称 = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)logger.debug("关于消息的详细信息")logger.debug("线程{}",mainThread.getName())logger.debug("主题{}",topicName)logger.debug("Partition {}",record.partition().toString())logger.debug("Offset {}",record.offset().toString())logger.debug("className {}",consumerClassName)logger.debug("methodName {}",consumerMethodName)logger.debug("isBatchJob {}",isBatchJob.toString())对象消息 = record.value()logger.debug("message {}",message.toString())if(isBatchJob == true){prepareMessagesBatch(主题名称,消息)//batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())//logger.debug("batchSize {}",batchSize.toString())}别的{publishMessageToNonBatchConsumer(consumerClassName,consumerMethodName,message)}//publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)//尝试 {//kafkaConsumer.commitAsync(currentOffsetsMap,new AsyncCommitCallBack())logger.debug("{} 向 Kafka 提交消息",TAG)//}/*catch(异常异常){kafkaConsumer.commitSync(currentOffsetsMap)currentOffsetsMap.clear()logger.error("{} 异步提交时出错,因此同步提交 {}",TAG,exception.getStackTrace().join("\n"))}*/}提交偏移量()publishAllKafkaTopicBatchMessages()}}捕获(中断异常异常){logger.error("{} In InterruptException",TAG)logger.error("{} In Exception 异常消息 {}",TAG,exception.getMessage())logger.error("{} 异常 {}",TAG,exception.getStackTrace().join("\n"))}捕获(WakeupException 异常){logger.error("{} 唤醒异常",TAG)logger.error("{} In Exception 异常消息 {}",TAG,exception.getMessage())logger.error("{} 异常 {}",TAG,exception.getStackTrace().join("\n"))}捕获(异常异常){异常.getMessage()logger.error("{} In Exception",TAG)logger.error("{} In Exception 异常消息 {}",TAG,exception.getMessage())logger.error("{} 异常 {}",TAG,exception.getStackTrace().join("\n"))}最后 {logger.error("{} 最终提交剩余偏移量",TAG)publishAllKafkaTopicBatchMessages()//kafkaConsumer.commitSync(currentOffsetsMap)kafkaConsumer.close()logger.error("{} 退出消费者",TAG)}}私有无效 commitOffset(){logger.debug("{} [commitOffset] Enter")logger.debug("{} currentOffsetMap {}",currentOffsetsMap.toString())如果(currentOffsetsMap.size()> 0){kafkaConsumer.commitSync(currentOffsetsMap)currentOffsetsMap.clear()}logger.debug("{} [commitOffset] 退出")}private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){logger.debug("{} [publishMessageToConsumer] Enter",TAG)if(isBatchJob == true){publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)}别的{publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)}logger.debug("{} [publishMessageToConsumer] 退出",TAG)}private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)executeConsumerMethod(consumerClassName,consumerMethodName,message)logger.debug("{} [publishMessageToNonBatchConsumer] 退出",TAG)}私有无效publishMessageToBatchConsumer(字符串consumerClassName,字符串consumerMethodName,整数batchSize,对象消息,字符串topicName){logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)列出消费者消息列表 = kafkaTopicMessageListMap.get(topicName)consumerMessageList.add(message)if(consumerMessageList.size() == batchSize){logger.debug("{} [publishMessageToBatchConsumer] 批量推送消息",TAG)executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)消费者消息列表.clear()}kafkaTopicMessageListMap.put(topicName,consumerMessageList)logger.debug("{} [publishMessageToBatchConsumer] 退出",TAG)}私有无效 populateKafkaConfigMap(){logger.debug("{} [populateKafkaConfigMap] Enter",TAG)KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()topicNameList.each { topicName ->DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)}logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())logger.debug("{} [populateKafkaConfigMap] Exit",TAG)}私有无效 initializeKafkaTopicMessageListMap(){logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)topicNameList.each { topicName ->kafkaTopicMessageListMap.put(topicName,[])}logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)}private void executeConsumerMethod(String className, String methodName, def messages){尝试{logger.debug("{} [executeConsumerMethod] Enter",TAG)logger.debug("{} [executeConsumerMethod] className {} methodName {} messages {}",TAG,className,methodName,messages.toString())Class.forName(className)."$methodName"(messages)} catch(异常异常){logger.error("{} [{}] 执行方法时出错:{} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,className,messages.toString(),exception.getStackTrace().join("\n"))}logger.debug("{} [executeConsumerMethod] Exit",TAG)}私有无效publishAllKafkaTopicBatchMessages(){logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)字符串消费者类名 = null字符串消费者方法名称 = nullkafkaTopicMessageListMap.each { topicName, messageList ->if (messageList != null && messageList.size() > 0) {DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)消费者类名称 = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)消费者方法名称 = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)logger.debug("{} 在主题中推送消息 {} className {} methodName {} ", TAG, topicName, consumerClassName, consumerMethodName)if (messageList != null && messageList.size() > 0) {executeConsumerMethod(consumerClassName, consumerMethodName, messageList)messageList.clear()kafkaTopicMessageListMap.put(topicName, messageList)}}}logger.debug("{} [publishAllKafkaTopicBatchMessages] 退出",TAG)}private void prepareMessagesBatch(String topicName,Object message){logger.debug("{} [prepareMessagesBatch] Enter",TAG)logger.debug("{} [prepareMessagesBatch] 为主题 {} 准备批处理",TAG,topicName)logger.debug("{} [prepareMessagesBatch] 为消息 {} 准备批处理",TAG,message.toString())列出消费者消息列表 = kafkaTopicMessageListMap.get(topicName)consumerMessageList.add(message)kafkaTopicMessageListMap.put(topicName,consumerMessageList)}

}

解决方案

Kafka Consumers 通过以下两个参数处理数据 backlog,

ma​​x.poll.interval.ms
使用消费者组管理时调用 poll() 之间的最大延迟.这为消费者在获取更多记录之前可以空闲的时间设置了上限.如果在此超时到期之前没有调用 poll(),则认为消费者失败,组将重新平衡以将分区重新分配给另一个成员.
默认值为 300000.

ma​​x.poll.records
单次调用 poll() 时返回的最大记录数.
默认值为 500.

忽略根据要求设置上述两个参数可能会导致轮询消费者可能无法使用可用资源处理的最大数据,从而导致 OutOfMemory 或有时无法提交消费者偏移量.因此,始终建议使用 max.poll.recordsmax.poll.interval.ms 参数.

在您的代码中,KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() 缺少这两个参数,这可能是轮询期间出现 OutOfMemory 问题的原因.

I am using single node Kafka broker (0.10.2) and single node zookeeper broker (3.4.9). I am having a consumer server (single core and 1.5 GB RAM). Whenever I am running a process with 5 or more threads my consumer's threads are getting killed after throwing these exceptions

  1. Exception 1

java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

  1. Exception 2

Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

I googled it and used below-mentioned JVM parameters but still the same exceptions occurred

-XX:MaxDirectMemorySize=768m

-Xms512m

How to fix this issue?Is any other javm parameter tuning required?

My Kafka consumer Code is

import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern

class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
List topicNameList
Map kafkaTopicConfigMap = new HashMap<String,Object>()
Map kafkaTopicMessageListMap = new HashMap<String,List>()
Boolean isRebalancingTriggered = false
private final Long REBALANCING_SLEEP_TIME = 1000

public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex, Integer batchSize, Integer maxPollTime, Integer requestTime){
    logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
    logger.debug("Populating Property for kafak consumer")
    logger.debug("BatchSize {}",batchSize)
    Properties kafkaConsumerProperties = new Properties()
    kafkaConsumerProperties.put("group.id", groupName)
    kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumerv2.deserializer.CustomObjectDeserializer")
    switch(serverType){
        case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
            kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
            kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
            kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
            break
        case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
            kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
            kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
            kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
            kafkaConsumerProperties.put("max.poll.records",1)
            kafkaConsumerProperties.put("max.poll.interval.ms",600000)
            kafkaConsumerProperties.put("request.timeout.ms",600005)
            break
        default :
            throw "Invalid server type"
            break
    }
    logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())
    kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
    topicNameList = topicNameRegex.split(Pattern.quote('|'))
    logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())
    logger.debug("{} [Constructor] Exit",TAG)
}

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.error('{} In onPartitionAssigned setting isRebalancingTriggered to false',TAG)
        isRebalancingTriggered = false
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        logger.error("{} In onPartitionsRevoked setting osRebalancingTriggered to true",TAG)
        isRebalancingTriggered = true
        publishAllKafkaTopicBatchMessages()
        commitOffset()

    }
}

private class AsyncCommitCallBack implements OffsetCommitCallback{

    @Override
    void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {

    }
}

@Override
void run() {
    logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName())
    populateKafkaConfigMap()
    initializeKafkaTopicMessageListMap()
    String topicName
    String consumerClassName
    String consumerMethodName
    Boolean isBatchJob
    Integer batchSize = 0
    final Thread mainThread = Thread.currentThread()
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName())
            kafkaConsumer.wakeup()
            try {
                mainThread.join()
            } catch (InterruptedException exception) {
                logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n"))
            }
        }
    })
    kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
    try{
        while(true){
            logger.debug("{} Starting Consumer with polling time in ms 100",TAG)
            ConsumerRecords kafkaRecords
            if(isRebalancingTriggered == false) {
                kafkaRecords = kafkaConsumer.poll(100)
            }
            else{
                logger.error("{} in rebalancing going to sleep",TAG)
                Thread.sleep(REBALANCING_SLEEP_TIME)
                continue
            }
            for(ConsumerRecord record: kafkaRecords){
                if(isRebalancingTriggered == true){
                    break
                }
                currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
                topicName = record.topic()
                DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
                consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                logger.debug("Details about Message")
                logger.debug("Thread {}",mainThread.getName())
                logger.debug("Topic {}",topicName)
                logger.debug("Partition {}",record.partition().toString())
                logger.debug("Offset {}",record.offset().toString())
                logger.debug("clasName {}",consumerClassName)
                logger.debug("methodName {}",consumerMethodName)
                logger.debug("isBatchJob {}",isBatchJob.toString())
                Object message = record.value()
                logger.debug("message {}",message.toString())
                if(isBatchJob == true){
                    prepareMessagesBatch(topicName,message)
                    //batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
                    //logger.debug("batchSize {}",batchSize.toString())
                }
                else{
                    publishMessageToNonBatchConsumer(consumerClassName,consumerMethodName,message)
                }
                //publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
                //try {
                //  kafkaConsumer.commitAsync(currentOffsetsMap,new AsyncCommitCallBack())
                logger.debug("{} Commiting Messages to Kafka",TAG)
                //}
                /*catch(Exception exception){
                    kafkaConsumer.commitSync(currentOffsetsMap)
                    currentOffsetsMap.clear()
                    logger.error("{} Error while commiting async so commiting in sync {}",TAG,exception.getStackTrace().join("\n"))
                }*/
            }
            commitOffset()
            publishAllKafkaTopicBatchMessages()
        }
    }
    catch(InterruptException exception){
        logger.error("{} In InterruptException",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    catch (WakeupException exception) {
        logger.error("{} In WakeUp Exception",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    catch(Exception exception){
        exception.getMessage()
        logger.error("{} In Exception",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    finally {
        logger.error("{} In finally commiting remaining offset ",TAG)
        publishAllKafkaTopicBatchMessages()
        //kafkaConsumer.commitSync(currentOffsetsMap)
        kafkaConsumer.close()
        logger.error("{} Exiting Consumer",TAG)
    }
}

private void commitOffset(){
    logger.debug("{} [commitOffset] Enter")
    logger.debug("{} currentOffsetMap {}",currentOffsetsMap.toString())
    if(currentOffsetsMap.size() > 0) {
        kafkaConsumer.commitSync(currentOffsetsMap)
        currentOffsetsMap.clear()
    }
    logger.debug("{} [commitOffset] Exit")

}

private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){
    logger.debug("{} [publishMessageToConsumer] Enter",TAG)
    if(isBatchJob == true){
        publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
    }
    else{
        publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
    }
    logger.debug("{} [publishMessageToConsumer] Exit",TAG)
}

private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){
    logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)
    executeConsumerMethod(consumerClassName,consumerMethodName,message)
    logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG)
}

private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){
    logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    if(consumerMessageList.size() == batchSize){
        logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
        executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
        consumerMessageList.clear()
    }
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)
    logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG)
}

private void populateKafkaConfigMap(){
    logger.debug("{} [populateKafkaConfigMap] Enter",TAG)
    KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
    topicNameList.each { topicName ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
        kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())
    logger.debug("{} [populateKafkaConfigMap] Exit",TAG)
}

private void initializeKafkaTopicMessageListMap(){
    logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)
    topicNameList.each { topicName ->
        kafkaTopicMessageListMap.put(topicName,[])
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())
    logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)
}

private void executeConsumerMethod(String className, String methodName, def messages){
    try{
        logger.debug("{} [executeConsumerMethod] Enter",TAG)
        logger.debug("{} [executeConsumerMethod] className  {} methodName {} messages {}",TAG,className,methodName,messages.toString())
        Class.forName(className)."$methodName"(messages)
    } catch (Exception exception){
        logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,
                className, messages.toString(), exception.getStackTrace().join("\n"))
    }
    logger.debug("{} [executeConsumerMethod] Exit",TAG)
}

private void publishAllKafkaTopicBatchMessages(){
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)
    String consumerClassName = null
    String consumerMethodName = null
    kafkaTopicMessageListMap.each { topicName, messageList ->
        if (messageList != null && messageList.size() > 0) {
            DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
            consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
            consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
            logger.debug("{} Pushing message in topic {} className {} methodName {} ", TAG, topicName, consumerClassName, consumerMethodName)
            if (messageList != null && messageList.size() > 0) {
                executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
                messageList.clear()
                kafkaTopicMessageListMap.put(topicName, messageList)

            }
        }
    }
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG)
}

private void prepareMessagesBatch(String topicName,Object message){
    logger.debug("{} [prepareMessagesBatch] Enter",TAG)
    logger.debug("{} [prepareMessagesBatch] preparing batch for topic {}",TAG,topicName)
    logger.debug("{} [prepareMessagesBatch] preparting batch for message {}",TAG,message.toString())
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)

}

}

解决方案

Kafka Consumers handles the data backlog by the following two parameters,

max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
Default value is 300000.

max.poll.records
The maximum number of records returned in a single call to poll().
Default value is 500.

Ignoring to set the above two parameters according to the requirement could lead to polling of maximum data which the consumer may not be able to handle with the available resources, leading to OutOfMemory or failure to commit the consumer offset at times. Hence, it is always advisable to use the max.poll.records and max.poll.interval.ms parameters.

In your code, the case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() is missing these two parameters which could possibly be the cause of the OutOfMemory problem during polling.

这篇关于Kafka 消费者抛出 java.lang.OutOfMemoryError:直接缓冲内存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆