重新启动我的Kafka Streams应用程序时出现OutOfMemoryError [英] OutOfMemoryError when restart my Kafka Streams appplication

查看:101
本文介绍了重新启动我的Kafka Streams应用程序时出现OutOfMemoryError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Kafka Streams应用程序(Kafka Streams 2.1 + Kafka broker 2.0),它基于TimeWindows进行聚合,并且我使用了抑制操作来抑制结果的输出.

I have a Kafka Streams app(Kafka Streams 2.1 + Kafka broker 2.0) which does a aggregation based on TimeWindows, and I use the suppress operator to supress the result's output.

一切正常,直到我重新启动应用程序为止,它将按预期将KTABLE-SUPPRESS-STATE-STORE的偏移量重置为0以恢复抑制状态.但是每次我重新启动它时,它都会抛出一个 OutOfMemoryError ,我认为也许堆大小不够,所以我使用了较大的 Xmx/Xms ,它可以工作一个或两次重新启动,然后 OutOfMemoryError 再次返回.现在 Xmx 现在大约20G,我认为这里不对.

Everything works well until I restart my app, it will reset the offset of KTABLE-SUPPRESS-STATE-STORE to 0 to restore the suppression state, as expected. But each time I restart it, it will throw an OutOfMemoryError, I thought maybe the heap size is not enough, so I use a larger Xmx/Xms, it works one or two restart, and then the OutOfMemoryError comes back again. Now the Xmx is about 20G now, I think something is not right here.

代码段:

TimeWindows windows = TimeWindows.of(windowSize).until(retentionHours.toMillis()).grace(graceHours);

KTable<Windowed<String>, MyStatistics> kTable = groupedBySerialNumber
                .windowedBy(windows)
                .aggregate(MyStatistics::new,
                    (sn, resList, stats) -> stats.addResources(resList).updateSN(sn),
                    Materialized.with(Serdes.String(), ArchiveSerdes.resourceStatistics()))
                .suppress(Suppressed.untilTimeLimit(timeToWait, Suppressed.BufferConfig.maxBytes(bufferMaxBytes)));

我发现KTABLE-SUPPRESS-STATE-STORE中的记录键类似于1234567j.P,这是不可读的,但我想它是由SN和窗口结合生成的,我认为这会使KTABLE-SUPPRESS-STATE-STORE冗余,因为每个SN在每个窗口中都会有多个记录.

And I find that the key of record in KTABLE-SUPPRESS-STATE-STORE is something like 1234567j�P, which is not readable, but I guess it's generated by combine the SN and window, I think this will make KTABLE-SUPPRESS-STATE-STORE redundent, because each SN will have multi records for each window.

我有两个问题:

  1. 如果 OutOfMemoryError 指示的堆大小较小,则如何限制速率,如果不是,则表示什么意思?
  2. KTABLE-SUPPRESS-STATE-STORE的密钥由哪个API定义,我应该如何控制?
  1. If the OutOfMemoryError indicates a small heap size or not, if so, how to limit the rate, if not, what does it means?
  2. The key for KTABLE-SUPPRESS-STATE-STORE is defined by which API, how or should can I control it?

谢谢!

在2019/4/16编辑

错误堆栈跟踪为:

java.lang.OutOfMemoryError: Java heap space        
        at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
        at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)        
        at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)        
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)        
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:381)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:342)        
        at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)        
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:467)        
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)        
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1243)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1164)
        at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:88)
        at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:321)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:839)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:747)

推荐答案

如果OutOfMemoryError指示的堆大小是否较小,那么如何限制该速率,如果不是,则表示什么意思?

If the OutOfMemoryError indicates a small heap size or not, if so, how to limit the rate, if not, what does it means?

是的,没有足够的堆来分配应用程序需要操作的所有内存.我们不经常看到这种情况,抑制运算符是新出现的,因此我对此表示怀疑,但请记住,基本上您的应用程序中的任何数据结构都可以负责.

Yes, there's not enough heap to allocate all the memory that the application needs to operate. We don't see this very often, and the suppression operator is new, so I'm suspicious of it, but it's good to bear in mind that basically any data structure in your application could be responsible.

诊断内存压力的最佳方法是执行堆转储".这基本上将JVM的整个内存复制到一个文件中,以便您可以使用 https://www.eclipse.org/mat/.这将是一个学习曲线,但是我认为您会发现通常可以使用一些分析内存使用情况的工具.

The best way to diagnose memory pressure is to do a "heap dump". This basically copies your JVM's entire memory into a file, so that you can analyse its contents using a program like https://www.eclipse.org/mat/ . It'll be a little bit of a learning curve, but I think you'll find that some facility with analyzing memory usage is very handy in general.

您可以随时触发堆转储(有几种方法可以完成,您必须研究最适合自己的方法).但是我认为您会想利用Java的漂亮选项在发生内存不足错误时进行堆转储.这样,您更有可能正确地识别出罪魁祸首.参见 https://docs.oracle.com/javase/7/docs/webnotes/tsg/TSG-VM/html/clopts.html#gbzrr 或类似的JVM.

You can trigger a heap dump at any time (there are several ways to do it, you'll have to research the best way for you). But I think you'll want to make use of Java's nifty option to do a heap dump when it gets an out-of-memory error. This way, you're more likely to positively identify the culprit. See https://docs.oracle.com/javase/7/docs/webnotes/tsg/TSG-VM/html/clopts.html#gbzrr , or similar for your JVM.

我可以推测出堆转储的原因,但恐怕我可能会导致您误入歧途并浪费您的时间.获得转储结果后,我认为您应该继续并在Kafka问题跟踪器中打开错误报告:

I can speculate about the cause of the heap dump, but I'm afraid I might just lead you astray and waste your time. Once you have the results of the dump, I think you should go ahead and open a bug report in the Kafka issue tracker: https://issues.apache.org/jira/projects/KAFKA . Then, we can help figure out both how to work around the bug to get you running again, and also how to fix it in future releases.

实际上,我会提供一种推测...您可能会看到此错误的结果:

Actually, I will offer one speculation... It's possible you're seeing a result of this bug: https://github.com/apache/kafka/pull/6536 (https://issues.apache.org/jira/browse/KAFKA-7895) . If your OOME goes away when you remove the suppression operator, you might want to leave it out for now. As soon as we merge the fix, I'll request a bugfix release, and you can try again to see if the problem is resolved.

KTABLE-SUPPRESS-STATE-STORE的密钥由哪个API定义,我应该如何控制?

The key for KTABLE-SUPPRESS-STATE-STORE is defined by which API, how or should can I control it?

幸运的是,这有一个更简单的答案.您正在查看的键是记录键和窗口时间戳的二进制打包版本.此键是您使用 windowBy 的结果.在Java中,您可以看到聚合的结果是 KTable< Windowed< String> ;, ...> ,并且Suppress不会更改键或值类型.换句话说,您正在查看密钥的序列化版本( Windowed< String> ).

Fortunately, this has a more straightforward answer. The key you're looking at is a binary-packed version of your record key and the timestamp of the window. This key is a result of your usage of windowBy. In Java, you can see that the result of the aggregation is a KTable<Windowed<String>, ...> and that Suppress doesn't change the key or value type. In other words, you're looking at a serialized version of the key (Windowed<String>).

抛弃压抑一秒钟;假设您有两个序列号"asdf"和"zxcv".假设您的窗口大小为一小时.您的应用程序在一天的每个小时内(分别)对这些序列号的每个进行事件分组.因此,从9:00到10:00的所有"asdf"记录都有一个汇总,从9:00到10:00的所有"zxcv"记录也有一个汇总.因此,窗口化KTable中的键总数为键空间 x 保留的窗口数.

Leaving suppression aside for a second; Let's say you have two serial numbers, "asdf" and "zxcv". Let's say your window size is one hour. Your application is grouping events for each of those serial numbers (independently) in each hour of the day. So there's an aggregation for all the "asdf" records from 9:00 to 10:00, and there's also one for all the "zxcv" records from 9:00 to 10:00. Thus, the total number of keys in the windowed KTable is key space x number of windows being retained .

Suppression运算符对KTable中的键数没有影响.其目的是在指定的时间内( timeToWait )禁止对这些键进行更新.例如,在不禁止的情况下,如果您在9:00和10:00之间对"asdf"记录进行了3次更新,则窗口聚合将针对每个(asdf,9:00)发出更新的结果.时间,因此对于3个事件,您将看到3个结果更新.Suppress运算符只阻止这些结果更新,直到 timeToWait 通过为止,并且当它通过时,它仅发出最近的更新.

The Suppression operator will have no effect on the number of keys in the KTable. Its purpose is to suppress updates to those keys for a specified amount of time (timeToWait). For example, without suppress, if you get 3 updates to the "asdf" record between 9:00 and 10:00, the windowed aggregation will emit an updated result for (asdf, 9:00) each time, so for 3 events in, you see 3 result updates coming out. The Suppress operator just prevents those result updates until timeToWait has passed, and when it does pass, it emits only the most recent update.

因此,任何时候抑制缓冲区中的键数都小于上游KTable中的键总数.它只包含在最近的 timeToWait 时间中已更新的密钥.

So, the number of keys in the suppression buffer at any time is smaller than the total number of keys in the upstream KTable. It just contains the keys that have been updated in the last timeToWait amount of time.

有帮助吗?

这篇关于重新启动我的Kafka Streams应用程序时出现OutOfMemoryError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆