卡夫卡消费者抛出java.lang.OutOfMemoryError:直接缓冲内存 [英] Kafka Consumers throwing java.lang.OutOfMemoryError: Direct buffer memory

查看:1331
本文介绍了卡夫卡消费者抛出java.lang.OutOfMemoryError:直接缓冲内存的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用单节点Kafka代理(0.10.2)和单节点Zookeeper代理(3.4.9).我有一台使用者服务器(单核和1.5 GB RAM ).每当我运行具有5个或更多线程的进程时,抛出这些异常后,我的使用者的线程就会被杀死

I am using single node Kafka broker (0.10.2) and single node zookeeper broker (3.4.9). I am having a consumer server (single core and 1.5 GB RAM). Whenever I am running a process with 5 or more threads my consumer's threads are getting killed after throwing these exceptions

  1. 例外1

java.lang.OutOfMemoryError:Java堆空间 在java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) 在java.nio.ByteBuffer.allocate(ByteBuffer.java:335) 在org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) 在org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 在org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) 在org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) 在org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) 在org.apache.kafka.common.network.Selector.poll(Selector.java:303) 在org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) 在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) 在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) 在org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ HeartbeatThread.run(AbstractCoordinator.java:887)

java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:93) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

  1. 异常2

kafka-coordinator-heartbeat-thread中未捕获的异常|主题1: java.lang.OutOfMemoryError:直接缓冲存储器 在java.nio.Bits.reserveMemory(Bits.java:693) 在java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) 在java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) 在sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) 在sun.nio.ch.IOUtil.read(IOUtil.java:195) 在sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) 在org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) 在org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) 在org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 在org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) 在org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) 在org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) 在org.apache.kafka.common.network.Selector.poll(Selector.java:303) 在org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) 在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) 在org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) 在org.apache.kafka.clients.consumer.internals.AbstractCoordinator $ HeartbeatThread.run(AbstractCoordinator.java:887)

Uncaught exception in kafka-coordinator-heartbeat-thread | topic1: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110) at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355) at org.apache.kafka.common.network.Selector.poll(Selector.java:303) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:263) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:887)

我用谷歌搜索并使用了下面提到的JVM参数,但是仍然发生了相同的异常

I googled it and used below-mentioned JVM parameters but still the same exceptions occurred

-XX:MaxDirectMemorySize = 768m

-XX:MaxDirectMemorySize=768m

-Xms512m

如何解决此问题?是否需要其他任何javm参数调整?

How to fix this issue?Is any other javm parameter tuning required?

我的Kafka消费者代码是

My Kafka consumer Code is

import com.mongodb.DBObject
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.consumer.OffsetCommitCallback
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.InterruptException
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.regex.Pattern

class KafkaPollingConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(KafkaPollingConsumer.class)
private static final String TAG = "[KafkaPollingConsumer]"
private final KafkaConsumer<String, byte []> kafkaConsumer
private Map<TopicPartition,OffsetAndMetadata> currentOffsetsMap = new HashMap<>()
List topicNameList
Map kafkaTopicConfigMap = new HashMap<String,Object>()
Map kafkaTopicMessageListMap = new HashMap<String,List>()
Boolean isRebalancingTriggered = false
private final Long REBALANCING_SLEEP_TIME = 1000

public KafkaPollingConsumer(String serverType, String groupName, String topicNameRegex, Integer batchSize, Integer maxPollTime, Integer requestTime){
    logger.debug("{} [Constructor] [Enter] Thread Name {} serverType group Name TopicNameRegex",TAG,Thread.currentThread().getName(),serverType,groupName,topicNameRegex)
    logger.debug("Populating Property for kafak consumer")
    logger.debug("BatchSize {}",batchSize)
    Properties kafkaConsumerProperties = new Properties()
    kafkaConsumerProperties.put("group.id", groupName)
    kafkaConsumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaConsumerProperties.put("value.deserializer", "com.custom.kafkaconsumerv2.deserializer.CustomObjectDeserializer")
    switch(serverType){
        case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() :
            kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.priority.kafkaNode)
            kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.priority.consumer.enable.auto.commit)
            kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.priority.consumer.auto.offset.reset)
            break
        case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Bulk.toString() :
            kafkaConsumerProperties.put("bootstrap.servers",ConfigLoader.conf.kafkaServer.bulk.kafkaNode)
            kafkaConsumerProperties.put("enable.auto.commit",ConfigLoader.conf.kafkaServer.bulk.consumer.enable.auto.commit)
            kafkaConsumerProperties.put("auto.offset.reset",ConfigLoader.conf.kafkaServer.bulk.consumer.auto.offset.reset)
            kafkaConsumerProperties.put("max.poll.records",1)
            kafkaConsumerProperties.put("max.poll.interval.ms",600000)
            kafkaConsumerProperties.put("request.timeout.ms",600005)
            break
        default :
            throw "Invalid server type"
            break
    }
    logger.debug("{} [Constructor] KafkaConsumer Property Populated {}",properties.toString())
    kafkaConsumer = new KafkaConsumer<String, byte []>(kafkaConsumerProperties)
    topicNameList = topicNameRegex.split(Pattern.quote('|'))
    logger.debug("{} [Constructor] Kafkatopic List {}",topicNameList.toString())
    logger.debug("{} [Constructor] Exit",TAG)
}

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        logger.error('{} In onPartitionAssigned setting isRebalancingTriggered to false',TAG)
        isRebalancingTriggered = false
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        logger.error("{} In onPartitionsRevoked setting osRebalancingTriggered to true",TAG)
        isRebalancingTriggered = true
        publishAllKafkaTopicBatchMessages()
        commitOffset()

    }
}

private class AsyncCommitCallBack implements OffsetCommitCallback{

    @Override
    void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {

    }
}

@Override
void run() {
    logger.debug("{} Starting Thread ThreadName {}",TAG,Thread.currentThread().getName())
    populateKafkaConfigMap()
    initializeKafkaTopicMessageListMap()
    String topicName
    String consumerClassName
    String consumerMethodName
    Boolean isBatchJob
    Integer batchSize = 0
    final Thread mainThread = Thread.currentThread()
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            logger.error("{},gracefully shutdowning thread {}",TAG,mainThread.getName())
            kafkaConsumer.wakeup()
            try {
                mainThread.join()
            } catch (InterruptedException exception) {
                logger.error("{} Error : {}",TAG,exception.getStackTrace().join("\n"))
            }
        }
    })
    kafkaConsumer.subscribe(topicNameList , new HandleRebalance())
    try{
        while(true){
            logger.debug("{} Starting Consumer with polling time in ms 100",TAG)
            ConsumerRecords kafkaRecords
            if(isRebalancingTriggered == false) {
                kafkaRecords = kafkaConsumer.poll(100)
            }
            else{
                logger.error("{} in rebalancing going to sleep",TAG)
                Thread.sleep(REBALANCING_SLEEP_TIME)
                continue
            }
            for(ConsumerRecord record: kafkaRecords){
                if(isRebalancingTriggered == true){
                    break
                }
                currentOffsetsMap.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() +1))
                topicName = record.topic()
                DBObject kafkaTopicConfigDBObject = kafkaTopicConfigMap.get(topicName)
                consumerClassName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
                consumerMethodName = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
                isBatchJob = kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.IS_BATCH_JOB_KEY)
                logger.debug("Details about Message")
                logger.debug("Thread {}",mainThread.getName())
                logger.debug("Topic {}",topicName)
                logger.debug("Partition {}",record.partition().toString())
                logger.debug("Offset {}",record.offset().toString())
                logger.debug("clasName {}",consumerClassName)
                logger.debug("methodName {}",consumerMethodName)
                logger.debug("isBatchJob {}",isBatchJob.toString())
                Object message = record.value()
                logger.debug("message {}",message.toString())
                if(isBatchJob == true){
                    prepareMessagesBatch(topicName,message)
                    //batchSize = Integer.parseInt(kafkaTopicConfigDBObject.get(KafkaTopicConfigEntity.BATCH_SIZE_KEY).toString())
                    //logger.debug("batchSize {}",batchSize.toString())
                }
                else{
                    publishMessageToNonBatchConsumer(consumerClassName,consumerMethodName,message)
                }
                //publishMessageToConsumers(consumerClassName,consumerMethodName,isBatchJob,batchSize,message,topicName)
                //try {
                //  kafkaConsumer.commitAsync(currentOffsetsMap,new AsyncCommitCallBack())
                logger.debug("{} Commiting Messages to Kafka",TAG)
                //}
                /*catch(Exception exception){
                    kafkaConsumer.commitSync(currentOffsetsMap)
                    currentOffsetsMap.clear()
                    logger.error("{} Error while commiting async so commiting in sync {}",TAG,exception.getStackTrace().join("\n"))
                }*/
            }
            commitOffset()
            publishAllKafkaTopicBatchMessages()
        }
    }
    catch(InterruptException exception){
        logger.error("{} In InterruptException",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    catch (WakeupException exception) {
        logger.error("{} In WakeUp Exception",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    catch(Exception exception){
        exception.getMessage()
        logger.error("{} In Exception",TAG)
        logger.error("{} In Exception exception message {}",TAG,exception.getMessage())
        logger.error("{} Exception {}",TAG,exception.getStackTrace().join("\n"))
    }
    finally {
        logger.error("{} In finally commiting remaining offset ",TAG)
        publishAllKafkaTopicBatchMessages()
        //kafkaConsumer.commitSync(currentOffsetsMap)
        kafkaConsumer.close()
        logger.error("{} Exiting Consumer",TAG)
    }
}

private void commitOffset(){
    logger.debug("{} [commitOffset] Enter")
    logger.debug("{} currentOffsetMap {}",currentOffsetsMap.toString())
    if(currentOffsetsMap.size() > 0) {
        kafkaConsumer.commitSync(currentOffsetsMap)
        currentOffsetsMap.clear()
    }
    logger.debug("{} [commitOffset] Exit")

}

private void publishMessageToConsumers(String consumerClassName,String consumerMethodName,Boolean isBatchJob,Integer batchSize,Object message, String topicName){
    logger.debug("{} [publishMessageToConsumer] Enter",TAG)
    if(isBatchJob == true){
        publishMessageToBatchConsumer(consumerClassName, consumerMethodName,batchSize, message, topicName)
    }
    else{
        publishMessageToNonBatchConsumer(consumerClassName, consumerMethodName, message)
    }
    logger.debug("{} [publishMessageToConsumer] Exit",TAG)
}

private void publishMessageToNonBatchConsumer(String consumerClassName, String consumerMethodName, message){
    logger.debug("{} [publishMessageToNonBatchConsumer] Enter",TAG)
    executeConsumerMethod(consumerClassName,consumerMethodName,message)
    logger.debug("{} [publishMessageToNonBatchConsumer] Exit",TAG)
}

private void publishMessageToBatchConsumer(String consumerClassName, String consumerMethodName, Integer batchSize, Object message, String topicName){
    logger.debug("{} [publishMessageToBatchConsumer] Enter",TAG)
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    if(consumerMessageList.size() == batchSize){
        logger.debug("{} [publishMessageToBatchConsumer] Pushing Messages In Batches",TAG)
        executeConsumerMethod(consumerClassName, consumerMethodName, consumerMessageList)
        consumerMessageList.clear()
    }
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)
    logger.debug("{} [publishMessageToBatchConsumer] Exit",TAG)
}

private void populateKafkaConfigMap(){
    logger.debug("{} [populateKafkaConfigMap] Enter",TAG)
    KafkaTopicConfigDBService kafkaTopicConfigDBService = KafkaTopicConfigDBService.getInstance()
    topicNameList.each { topicName ->
        DBObject kafkaTopicDBObject = kafkaTopicConfigDBService.findByTopicName(topicName)
        kafkaTopicConfigMap.put(topicName,kafkaTopicDBObject)
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaConfigMap {}",TAG,kafkaTopicConfigMap.toString())
    logger.debug("{} [populateKafkaConfigMap] Exit",TAG)
}

private void initializeKafkaTopicMessageListMap(){
    logger.debug("{} [initializeKafkaTopicMessageListMap] Enter",TAG)
    topicNameList.each { topicName ->
        kafkaTopicMessageListMap.put(topicName,[])
    }
    logger.debug("{} [populateKafkaConfigMap] kafkaTopicMessageListMap {}",TAG,kafkaTopicMessageListMap.toString())
    logger.debug("{} [initializeKafkaTopicMessageListMap] Exit",TAG)
}

private void executeConsumerMethod(String className, String methodName, def messages){
    try{
        logger.debug("{} [executeConsumerMethod] Enter",TAG)
        logger.debug("{} [executeConsumerMethod] className  {} methodName {} messages {}",TAG,className,methodName,messages.toString())
        Class.forName(className)."$methodName"(messages)
    } catch (Exception exception){
        logger.error("{} [{}] Error while executing method : {} of class: {} with params : {} - {}", TAG, Thread.currentThread().getName(), methodName,
                className, messages.toString(), exception.getStackTrace().join("\n"))
    }
    logger.debug("{} [executeConsumerMethod] Exit",TAG)
}

private void publishAllKafkaTopicBatchMessages(){
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Enter",TAG)
    String consumerClassName = null
    String consumerMethodName = null
    kafkaTopicMessageListMap.each { topicName, messageList ->
        if (messageList != null && messageList.size() > 0) {
            DBObject kafkaTopicDBObject = kafkaTopicConfigMap.get(topicName)
            consumerClassName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.CLASS_NAME_KEY)
            consumerMethodName = kafkaTopicDBObject.get(KafkaTopicConfigEntity.METHOD_NAME_KEY)
            logger.debug("{} Pushing message in topic {} className {} methodName {} ", TAG, topicName, consumerClassName, consumerMethodName)
            if (messageList != null && messageList.size() > 0) {
                executeConsumerMethod(consumerClassName, consumerMethodName, messageList)
                messageList.clear()
                kafkaTopicMessageListMap.put(topicName, messageList)

            }
        }
    }
    logger.debug("{} [publishAllKafkaTopicBatchMessages] Exit",TAG)
}

private void prepareMessagesBatch(String topicName,Object message){
    logger.debug("{} [prepareMessagesBatch] Enter",TAG)
    logger.debug("{} [prepareMessagesBatch] preparing batch for topic {}",TAG,topicName)
    logger.debug("{} [prepareMessagesBatch] preparting batch for message {}",TAG,message.toString())
    List consumerMessageList = kafkaTopicMessageListMap.get(topicName)
    consumerMessageList.add(message)
    kafkaTopicMessageListMap.put(topicName,consumerMessageList)

}

}

推荐答案

Kafka Consumers通过以下两个参数来处理积压的数据,

Kafka Consumers handles the data backlog by the following two parameters,

最大轮询间隔ms
使用使用者组管理时,调用poll()之间的最大延迟.这为使用者在获取更多记录之前可以处于空闲状态的时间设置了上限.如果在此超时到期前未调用poll(),则认为使用方失败,该组将重新平衡以将分区重新分配给另一个成员.
默认值为300000.

max.poll.interval.ms
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.
Default value is 300000.

最大民意测验记录
一次调用poll()返回的最大记录数.
默认值为500.

max.poll.records
The maximum number of records returned in a single call to poll().
Default value is 500.

根据要求忽略设置以上两个参数可能会导致轮询使用者可能无法使用可用资源处理的最大数据,从而导致内存不足或有时无法提交使用者偏移量.因此,始终建议使用 max.poll.records max.poll.interval.ms 参数.

Ignoring to set the above two parameters according to the requirement could lead to polling of maximum data which the consumer may not be able to handle with the available resources, leading to OutOfMemory or failure to commit the consumer offset at times. Hence, it is always advisable to use the max.poll.records and max.poll.interval.ms parameters.

在您的代码中,情况 KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString()缺少这两个参数,这可能是轮询期间内存不足问题的原因.

In your code, the case KafkaTopicConfigEntity.KAFKA_NODE_TYPE_ENUM.Priority.toString() is missing these two parameters which could possibly be the cause of the OutOfMemory problem during polling.

这篇关于卡夫卡消费者抛出java.lang.OutOfMemoryError:直接缓冲内存的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆