Kafka Streams-减少大型状态存储的内存占用 [英] Kafka Streams - reducing the memory footprint for large state stores

查看:142
本文介绍了Kafka Streams-减少大型状态存储的内存占用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个拓扑结构(请参阅下文),它读取了一个非常大的主题(每天超过十亿条消息).这个Kafka Streams应用程序的内存使用率很高,我一直在寻找一些有关如何减少状态存储空间的建议(更多详细信息,请参见下文). 注意:我并不是想逃避国有商店,我只是认为我可能有一种改善拓扑结构的方法-见下文.

I have a topology (see below) that reads off a very large topic (over a billion messages per day). The memory usage of this Kafka Streams app is pretty high, and I was looking for some suggestions on how I might reduce the footprint of the state stores (more details below). Note: I am not trying to scape goat the state stores, I just think there may be a way for me to improve my topology - see below.

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc

更具体地说,我想知道将OUTPUT_TOPIC作为KTable进行流传输是否会导致状态存储(REKEYED_STORE)大于其在本地需要的大小.对于具有大量唯一键的changelog主题,将它们作为KStream进行流传输并进行窗口聚合会更好吗?还是不会像我想的那样减少占用空间(例如,仅一部分记录-窗口中的记录会存在于本地状态存储中).

More specifically, I'm wondering if streaming the OUTPUT_TOPIC as a KTable is causing the state store (REKEYED_STORE) to be larger than it needs to be locally. For changelog topics with a large number of unique keys, would it be better to stream these as a KStream and do windowed aggregations? Or would that not reduce the footprint like I think it would (e.g. that only a subset of the records - those in the window, would exist in the local state store).

无论如何,我总是可以启动该应用程序的更多实例,但是我想使每个实例尽可能高效.这是我的问题:

Anyways, I can always spin up more instances of this app, but I'd like to make each instance as efficient as possible. Here's my question:

  • 对于具有这种吞吐量水平的Kafka Streams应用程序,是否应考虑任何配置选项,常规策略等?
  • 是否有关于单个实例应具有的内存密集程度的准则?即使您有一些随意性的指导原则,与他人共享也可能会有所帮助.我的一个实例当前正在使用15GB的内存-我不知道这是好是坏,无关紧要.

任何帮助将不胜感激!

推荐答案

使用当前模式

stream.....reduce(.)toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)

您将获得两家具有相同内容的商店.一种用于reduce()运算符,另一种用于读取table(),尽管如此,它可以减少到一家商店:

you get two stores with the same content. One for the reduce() operator and one for reading the table() -- this can be reduced to one store though:

KTable rekeyedTable  = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely

这应该显着减少您的内存使用量.

This should reduce you memory usage notably.

关于窗口化与非窗口化:

About windowing vs non-windowing:

  1. 这取决于您所需要的语义;因此,从非窗口式减少到窗口式减少的简单转换似乎值得怀疑.

  1. it's a matter of your required semantics; so simple switching from a non-windowed to a windowed reduce seems to be questionable.

即使您也可以使用窗口语义,也不一定会减少内存.请注意,在聚合的情况下,Streams不会存储原始记录,而只会存储当前的聚合结果(即key + currentAgg).因此,对于单个键,两种情况下的存储要求都相同(单个窗口具有相同的存储要求).同时,如果您使用Windows,则在获得一个聚合专业密钥专业版窗口时(实际上,在非窗口情况下,您仅获得一个聚合专业密钥)实际上可能需要更多的内存.您可能会节省内存的唯一方案是您的密钥空间"长时间分布.例如,您可能很长一段时间都没有获得任何键的任何输入记录.在非窗口情况下,这些记录的聚合将一直存储,而在窗口情况下,键/agg记录将被删除,并且如果稍后出现具有该键的记录,则将重新创建新输入的记录.再说一次(但请记住,在这种情况下,您失去了先前的盖闸-参见(1)).

Even if you can also go with windowed semantics, you would not necessarily reduce memory. Note, in aggregation case, Streams does not store the raw records but only the current aggregate result (ie, key + currentAgg). Thus, for a single key, the storage requirement is the same for both cases (a single window has the same storage requirement). At the same time, if you go with windows, you might actually need more memory as you get an aggregate pro key pro window (while you get just a single aggregate pro key in the non-window case). The only scenario you might save memory, is the case for which you 'key space' is spread out over a long period of time. For example, you might not get any input records for some keys for a long time. In the non-windowed case, the aggregate(s) of those records will be stores all the time, while for the windowed case the key/agg record will be dropped and new entried will be re-created if records with this key occure later on again (but keep in mind, that you lost the previous aggergate in this case -- cf. (1))

最后但并非最不重要的一点,您可能想了解一下调整应用程序大小的准则:

Last but not least, you might want to have a look into the guidelines for sizing an application: http://docs.confluent.io/current/streams/sizing.html

这篇关于Kafka Streams-减少大型状态存储的内存占用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆