Flink + Kafka, java.lang.OutOfMemoryError 当并行性 >1 [英] Flink + Kafka, java.lang.OutOfMemoryError when parallelism > 1

查看:30
本文介绍了Flink + Kafka, java.lang.OutOfMemoryError 当并行性 >1的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个玩具 Flink 作业,它从 3 个 kafka 主题中读取数据,然后将所有这 3 个流合并.就是这样,没有额外的工作.

I've a toy Flink job which reads from 3 kafka topics, then union all these 3 streams. That's all, no extra work.

如果在我的 Flink 工作中使用并行度 1,一切似乎都很好,就像我改变并行度 > 1 一样,它会失败:

If using parallelism 1 for my Flink job, everything seems fine, as soos as I change parallelism > 1, it fails with:

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:693)
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:110)
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:97)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257)

为什么它适用于并行度 1 而不适用于并行度 > 1?

How come it works with parallelism 1 but not with parallelism > 1?

是否与kafka服务器端设置有关?或者它与我的 java 代码中的消费者设置有关(我的代码中还没有特殊配置)?

Is it related to kafka sever side setting? Or it's related to comsumer setting in my java code (no special config yet in my code)?

我知道这里提供的信息可能不够,但我无法触及 kafka 集群.只希望之前有高手遇到同样的错误,可以给我一些建议.

I know that the info proviced here may not be sufficient, but I'm not able to touch the kafka cluster. I just hope that some guru may happen to run into same error before, and can share with me some suggestions.

我使用的是 kafka 0.10,flink 1.5.

I'm using kafka 0.10, flink 1.5.

非常感谢.

推荐答案

正如您在错误日志中看到的,此错误来自您的 Kafka 集群.当 Kafka Broker 的 Direct Buffer Memory 超过分配给 JVM 的堆大小时,会出现此问题.直接缓冲内存根据应用程序的需要从 JVM 的堆中分配.当你使用parallelism > 1 时,多个Flink 任务,min(Number of Flink Slots, Number of Kafka partitions) 会同时消费来自Kafka的数据,导致更多使用Kafka brokers Heap size in对比一下当并行度等于1时就会发生所谓的错误.标准的解决方案是通过将 KAFKA_HEAP_OPTS 变量添加到 Kafka env 文件 或作为 OS 环境变量 来增加 Kafka Brokers 可用的堆大小.例如,添加以下行以将堆大小设置为 2 GB:

As you can see in the error logs this error is from your Kafka cluster. This issue occurs when the Direct Buffer Memory of the Kafka Broker exceeds the heap size assigned to the JVM. The Direct Buffer Memory is allocated from the heap of a JVM as required by the application. When you use parallelism > 1, multiple Flink tasks, min(Number of Flink Slots, Number of Kafka partitions) will consume data from Kafka at the same time, result in more use of Kafka brokers Heap size in comparison to when parallelism equals to one and the so-called error will happen. The standard solution is to increase the heap size available to the Kafka Brokers by adding the KAFKA_HEAP_OPTS variable to the Kafka env file or as OS environment variable. For example, add the following line to set the heap size to 2 GB:

export KAFKA_HEAP_OPTS="-Xms2G -Xmx2G"

但是在您无法访问 Kafka 代理的情况下(根据您的问题),您可以减少一次调用 poll() 中返回的记录数,因此代理中对堆内存的需求将是减少了.(这不是标准解决方案,我建议只是为了消除错误).

But in your case which there is no access to Kafka broker (according to your question), you can decrease the number of the record returned in a single call to poll(), so the need for Heap memory in brokers will be decreased. (It's not a standard solution, I recommend that just to disappear the error).

来自这个答案:

Kafka Consumers 通过以下两个来处理数据 backlog参数,

Kafka Consumers handles the data backlog by the following two parameters,

ma​​x.poll.interval.ms
调用 poll() 之间的最大延迟使用消费者组管理时.这将设置一个上限消费者在获取更多信息之前可以空闲的时间记录.如果在此超时到期之前没有调用 poll(),然后消费者被认为失败,组将重新平衡以便将分区重新分配给另一个成员.默认值为300000.

max.poll.interval.ms
The maximum delay between invocations of a poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. Default value is 300000.

ma​​x.poll.records
单次返回的最大记录数调用 poll().默认值为 500.

max.poll.records
The maximum number of records returned in a single call to poll(). The default value is 500.

忽略根据需求设置以上两个参数可能导致轮询消费者可能不知道的最大数据能够处理可用资源,导致 OutOfMemory 或有时未能提交消费者偏移量.因此,它总是建议使用 max.poll.records 和 max.poll.interval.ms参数.

Ignoring to set the above two parameters according to the requirement could lead to polling of maximum data which the consumer may not be able to handle with the available resources, leading to OutOfMemory or failure to commit the consumer offset at times. Hence, it is always advisable to use the max.poll.records, and max.poll.interval.ms parameters.

因此,对于测试,将 max.poll.records 的值降低到例如 250,然后检查是否会发生错误.

So for a test, decrease the value of max.poll.records to for example 250 and check if the error will happen, yet.

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", BOOTSTRAPSERVERS);
properties.setProperty("group.id", ID);
properties.setProperty("key.deserializer", Serializer);
properties.setProperty("value.deserializer", Deserializer);
properties.setProperty("max.poll.records", "250");

FlinkKafkaConsumer08<String> myConsumer =
    new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);

这篇关于Flink + Kafka, java.lang.OutOfMemoryError 当并行性 &gt;1的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆