重新启动我的 Kafka Streams 应用程序时出现 OutOfMemoryError [英] OutOfMemoryError when restart my Kafka Streams appplication

查看:29
本文介绍了重新启动我的 Kafka Streams 应用程序时出现 OutOfMemoryError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Kafka Streams 应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它基于 TimeWindows 进行聚合,我使用抑制运算符来抑制结果的输出.

I have a Kafka Streams app(Kafka Streams 2.1 + Kafka broker 2.0) which does a aggregation based on TimeWindows, and I use the suppress operator to supress the result's output.

一切正常,直到我重新启动我的应用程序,它会将 KTABLE-SUPPRESS-STATE-STORE 的偏移量重置为 0 以恢复抑制状态,正如预期的那样.但是每次重启都会抛出一个OutOfMemoryError,我想可能是堆大小不够,所以我用了一个更大的Xmx/Xms,它可以工作一个或两次重启,然后 OutOfMemoryError 又回来了.现在 Xmx 现在大约 20G,我觉得这里有些不对劲.

Everything works well until I restart my app, it will reset the offset of KTABLE-SUPPRESS-STATE-STORE to 0 to restore the suppression state, as expected. But each time I restart it, it will throw an OutOfMemoryError, I thought maybe the heap size is not enough, so I use a larger Xmx/Xms, it works one or two restart, and then the OutOfMemoryError comes back again. Now the Xmx is about 20G now, I think something is not right here.

代码片段:

TimeWindows windows = TimeWindows.of(windowSize).until(retentionHours.toMillis()).grace(graceHours);

KTable<Windowed<String>, MyStatistics> kTable = groupedBySerialNumber
                .windowedBy(windows)
                .aggregate(MyStatistics::new,
                    (sn, resList, stats) -> stats.addResources(resList).updateSN(sn),
                    Materialized.with(Serdes.String(), ArchiveSerdes.resourceStatistics()))
                .suppress(Suppressed.untilTimeLimit(timeToWait, Suppressed.BufferConfig.maxBytes(bufferMaxBytes)));

而且我发现KTABLE-SUPPRESS-STATE-STORE中record的key有点像1234567j P,不可读,但我猜是SN和window结合生成的,我想这会让KTABLE-SUPPRESS-STATE-STORE 冗余,因为每个 SN 每个窗口会有多条记录.

And I find that the key of record in KTABLE-SUPPRESS-STATE-STORE is something like 1234567j�P, which is not readable, but I guess it's generated by combine the SN and window, I think this will make KTABLE-SUPPRESS-STATE-STORE redundent, because each SN will have multi records for each window.

我有两个问题:

  1. 如果OutOfMemoryError表示堆大小很小,如果是,如何限制速率,如果不是,是什么意思?
  2. KTABLE-SUPPRESS-STATE-STORE 的键是由哪个 API 定义的,我应该如何或应该如何控制它?
  1. If the OutOfMemoryError indicates a small heap size or not, if so, how to limit the rate, if not, what does it means?
  2. The key for KTABLE-SUPPRESS-STATE-STORE is defined by which API, how or should can I control it?

谢谢!

在 2019/4/16 编辑

错误堆栈跟踪是:

java.lang.OutOfMemoryError: Java heap space        
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)        
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)        
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)        
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)        
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)        
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:467)        
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)        
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
        at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
        at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

推荐答案

如果OutOfMemoryError提示堆大小小,如果是,如何限制速率,如果不是,是什么意思?

If the OutOfMemoryError indicates a small heap size or not, if so, how to limit the rate, if not, what does it means?

是的,没有足够的堆来分配应用程序运行所需的所有内存.我们不经常看到这种情况,而且抑制运算符是新的,所以我对此持怀疑态度,但最好记住,基本上您的应用程序中的任何数据结构都可能有问题.

Yes, there's not enough heap to allocate all the memory that the application needs to operate. We don't see this very often, and the suppression operator is new, so I'm suspicious of it, but it's good to bear in mind that basically any data structure in your application could be responsible.

诊断内存压力的最佳方法是进行堆转储".这基本上将您的 JVM 的整个内存复制到一个文件中,以便您可以使用像 https:/这样的程序分析其内容/www.eclipse.org/mat/ .这将是一个学习曲线,但我认为您会发现一些分析内存使用情况的工具通常非常方便.

The best way to diagnose memory pressure is to do a "heap dump". This basically copies your JVM's entire memory into a file, so that you can analyse its contents using a program like https://www.eclipse.org/mat/ . It'll be a little bit of a learning curve, but I think you'll find that some facility with analyzing memory usage is very handy in general.

您可以随时触发堆转储(有多种方法可以做到,您必须研究最适合您的方法).但是我认为当遇到内存不足错误时,您会想要利用 Java 的漂​​亮选项来执行堆转储.这样,您更有可能积极地确定罪魁祸首.请参阅 https://docs.oracle.com/javase/7/docs/webnotes/tsg/TSG-VM/html/clopts.html#gbzrr 或类似的适用于您的 JVM.

You can trigger a heap dump at any time (there are several ways to do it, you'll have to research the best way for you). But I think you'll want to make use of Java's nifty option to do a heap dump when it gets an out-of-memory error. This way, you're more likely to positively identify the culprit. See https://docs.oracle.com/javase/7/docs/webnotes/tsg/TSG-VM/html/clopts.html#gbzrr , or similar for your JVM.

我可以推测堆转储的原因,但我担心我可能会让你误入歧途并浪费你的时间.获得转储结果后,我认为您应该继续在 Kafka 问题跟踪器中打开错误报告:https://issues.apache.org/jira/projects/KAFKA.然后,我们可以帮助找出如何解决该错误以让您再次运行,以及如何在未来版本中修复它.

I can speculate about the cause of the heap dump, but I'm afraid I might just lead you astray and waste your time. Once you have the results of the dump, I think you should go ahead and open a bug report in the Kafka issue tracker: https://issues.apache.org/jira/projects/KAFKA . Then, we can help figure out both how to work around the bug to get you running again, and also how to fix it in future releases.

实际上,我会提供一个推测......你可能看到了这个错误的结果:https://github.com/apache/kafka/pull/6536 (https://issues.apache.org/jira/browse/KAFKA-7895).如果您在删除抑制运算符时 OOME 消失,您可能希望暂时不使用它.我们合并修复后,我会请求发布错误修复版本,您可以再试一次,看看问题是否已解决.

Actually, I will offer one speculation... It's possible you're seeing a result of this bug: https://github.com/apache/kafka/pull/6536 (https://issues.apache.org/jira/browse/KAFKA-7895) . If your OOME goes away when you remove the suppression operator, you might want to leave it out for now. As soon as we merge the fix, I'll request a bugfix release, and you can try again to see if the problem is resolved.

KTABLE-SUPPRESS-STATE-STORE 的键是由哪个 API 定义的,我应该如何或应该如何控制它?

The key for KTABLE-SUPPRESS-STATE-STORE is defined by which API, how or should can I control it?

幸运的是,这有一个更直接的答案.您正在查看的密钥是记录密钥的二进制打包版本和窗口的时间戳.此密钥是您使用 windowBy 的结果.在 Java 中,您可以看到聚合的结果是一个 KTable, ...> 并且 Suppress 不会更改键或值类型.换句话说,您正在查看密钥的序列化版本 (Windowed).

Fortunately, this has a more straightforward answer. The key you're looking at is a binary-packed version of your record key and the timestamp of the window. This key is a result of your usage of windowBy. In Java, you can see that the result of the aggregation is a KTable<Windowed<String>, ...> and that Suppress doesn't change the key or value type. In other words, you're looking at a serialized version of the key (Windowed<String>).

暂时放下压抑;假设您有两个序列号,asdf"和zxcv".假设您的窗口大小为一小时.您的应用程序在一天中的每个小时(独立地)为每个 序列号分组事件.因此,从 9:00 到 10:00 的所有asdf"记录都有一个聚合,并且从 9:00 到 10:00 的所有zxcv"记录也有一个聚合.因此,加窗的 KTable 中的总键数为 键空间 x 保留的窗口数 .

Leaving suppression aside for a second; Let's say you have two serial numbers, "asdf" and "zxcv". Let's say your window size is one hour. Your application is grouping events for each of those serial numbers (independently) in each hour of the day. So there's an aggregation for all the "asdf" records from 9:00 to 10:00, and there's also one for all the "zxcv" records from 9:00 to 10:00. Thus, the total number of keys in the windowed KTable is key space x number of windows being retained .

Suppression 运算符不会影响 KTable 中的键数.其目的是在指定的时间 (timeToWait) 内抑制对这些键的更新.例如,在没有抑制的情况下,如果您在 9:00 到 10:00 之间对asdf"记录进行了 3 次更新,则窗口聚合将为 (asdf, 9:00) 每个发出更新结果时间,因此对于 3 个事件,您会看到 3 个结果更新.Suppress 操作符只是在 timeToWait 过去之前阻止这些结果更新,当它通过时,它只发出最近的更新.

The Suppression operator will have no effect on the number of keys in the KTable. Its purpose is to suppress updates to those keys for a specified amount of time (timeToWait). For example, without suppress, if you get 3 updates to the "asdf" record between 9:00 and 10:00, the windowed aggregation will emit an updated result for (asdf, 9:00) each time, so for 3 events in, you see 3 result updates coming out. The Suppress operator just prevents those result updates until timeToWait has passed, and when it does pass, it emits only the most recent update.

所以,任何时候抑制缓冲区中的key数量都小于上游KTable中key的总数.它只包含在最后 timeToWait 时间量内更新的键.

So, the number of keys in the suppression buffer at any time is smaller than the total number of keys in the upstream KTable. It just contains the keys that have been updated in the last timeToWait amount of time.

这有帮助吗?

这篇关于重新启动我的 Kafka Streams 应用程序时出现 OutOfMemoryError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆