当并行度>时,Flink + Kafka,java.lang.OutOfMemoryError 1个 [英] Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1

查看:584
本文介绍了当并行度>时,Flink + Kafka,java.lang.OutOfMemoryError 1个的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个玩具Flink作业,它读取3个kafka主题,然后将所有这3个流合并.仅此而已,无需其他工作.

如果对我的Flink工作使用并行性1,一切似乎都很好,因为当我更改并行性> 1时,它失败了:

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:693)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)

它如何与并行性1一起工作,而不是与并行性> 1一起工作?

这与kafka服务器的侧面设置有关吗?还是与我的Java代码中的用户设置有关(我的代码中尚无特殊配置)?

我知道这里提供的信息可能还不够,但是我无法接触kafka集群.我只是希望一些大师之前可能碰到同样的错误,并可以与我分享一些建议.

我正在使用kafka 0.10,flink 1.5.

非常感谢.

解决方案

正如您在错误日志中看到的那样,该错误来自您的Kafka集群.当Kafka Broker的直接缓冲内存超过分配给JVM的堆大小时,会发生此问题.根据应用程序的需要,直接缓冲内存是从JVM的堆中分配的.当您使用并行度> 1时,多个Flink任务 min(Flink插槽数,Kafka分区数)将同时消耗Kafka的数据,从而导致更多使用Kafka代理的堆大小为比较并行度等于1时会发生所谓的错误.标准解决方案是通过将 KAFKA_HEAP_OPTS 变量添加到 Kafka env文件或作为 OS环境变量,来增加可用于Kafka Brokers的堆大小. .例如,添加以下行以将堆大小设置为2 GB:

export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"

但是在您无法访问Kafka代理的情况下(根据您的问题),您可以减少在对poll()的单次调用中返回的记录数,因此将需要在代理中使用堆内存减少了. (这不是标准解决方案,我建议您只是为了消除错误).

来自此答案:

Kafka Consumers通过以下两种方式处理数据积压 参数,

最长轮询间隔
两次调用poll()之间的最大延迟 使用消费者组管理时.这为 消费者在获取更多信息之前可以空闲的时间 记录.如果在此超时时间之前未调用poll(), 则认为该消费者失败了,该小组将重新平衡 为了将分区重新分配给另一个成员.默认值为 300000.

最大民意测验记录
单个返回的最大记录数 调用poll().默认值为500.

根据需要忽略设置以上两个参数 可能导致轮询最大的数据,而消费者可能不会 能够处理可用资源,导致内存不足或 有时无法提供消费者补偿.因此,它总是 建议使用max.poll.records和max.poll.interval.ms 参数.

因此,要进行测试,请将 max.poll.records 的值减小到例如250,然后检查是否会发生错误.

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);

I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.

If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:693)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)

How come it works with parallelism 1 but not with parallelism > 1?

Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?

I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.

I'm using kafka 0.10, flink 1.5.

Many thanks.

解决方案

As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:

export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"

But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).

From this answer:

Kafka Consumers handles the data backlog by the following two parameters,

max.poll.interval.ms
The maximum delay between invocations of a poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. Default value is 300000.

max.poll.records
The maximum number of records returned in a single call to poll(). The default value is 500.

Ignoring to set the above two parameters according to the requirement could lead to polling of maximum data which the consumer may not be able to handle with the available resources, leading to OutOfMemory or failure to commit the consumer offset at times. Hence, it is always advisable to use the max.poll.records, and max.poll.interval.ms parameters.

So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);

这篇关于当并行度&gt;时,Flink + Kafka,java.lang.OutOfMemoryError 1个的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆