Kafka Streams - 减少大型状态存储的内存占用 [英] Kafka Streams - reducing the memory footprint for large state stores

查看:31
本文介绍了Kafka Streams - 减少大型状态存储的内存占用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个拓扑(见下文),可以读取一个非常大的主题(每天超过 10 亿条消息).这个 Kafka Streams 应用程序的内存使用量非常高,我一直在寻找一些关于如何减少状态存储空间占用的建议(更多细节见下文).注意:我不是想逃避国有商店,我只是认为可能有一种方法可以改善我的拓扑结构 - 见下文.

I have a topology (see below) that reads off a very large topic (over a billion messages per day). The memory usage of this Kafka Streams app is pretty high, and I was looking for some suggestions on how I might reduce the footprint of the state stores (more details below). Note: I am not trying to scape goat the state stores, I just think there may be a way for me to improve my topology - see below.

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc

更具体地说,我想知道将 OUTPUT_TOPIC 作为 KTable 流式传输是否会导致状态存储 (REKEYED_STORE) 大于本地所需的大小.对于具有大量唯一键的变更日志主题,将它们作为 KStream 流式传输并进行窗口聚合会更好吗?或者这不会像我想象的那样减少占用空间(例如,只有一部分记录——窗口中的记录,会存在于本地状态存储中).

More specifically, I'm wondering if streaming the OUTPUT_TOPIC as a KTable is causing the state store (REKEYED_STORE) to be larger than it needs to be locally. For changelog topics with a large number of unique keys, would it be better to stream these as a KStream and do windowed aggregations? Or would that not reduce the footprint like I think it would (e.g. that only a subset of the records - those in the window, would exist in the local state store).

无论如何,我总是可以启动此应用程序的更多实例,但我希望每个实例都尽可能高效.这是我的问题:

Anyways, I can always spin up more instances of this app, but I'd like to make each instance as efficient as possible. Here's my question:

  • 对于具有这种吞吐量水平的 Kafka Streams 应用,是否应考虑任何配置选项、一般策略等?
  • 对于单个实例的内存密集程度是否有任何指导方针?即使您有一些随意的指南,与他人分享也可能会有所帮助.我的一个实例目前正在使用 15GB 的内存 - 我不知道这是否好/坏/无关紧要.

任何帮助将不胜感激!

推荐答案

使用您当前的模式

stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)

您会获得两个内容相同的商店.一个用于 reduce() 操作符,另一个用于读取 table() —— 不过这可以减少到一个存储:

you get two stores with the same content. One for the reduce() operator and one for reading the table() -- this can be reduced to one store though:

KTable rekeyedTable  = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely

这应该会显着减少您的内存使用.

This should reduce your memory usage notably.

关于开窗与非开窗:

  1. 这是您所需的语义问题;从非窗口化到窗口化缩减的简单切换似乎是有问题的.

  1. it's a matter of your required semantics; so simple switching from a non-windowed to a windowed reduce seems to be questionable.

即使您也可以使用窗口语义,也不一定会减少内存.请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即,key + currentAgg).因此,对于单个密钥,两种情况的存储要求相同(单个窗口具有相同的存储要求).同时,如果您使用 Windows,当您获得聚合 pro key pro 窗口时,实际上可能需要更多内存(而在非窗口情况下,您只获得一个聚合 pro key).您可能会节省内存的唯一情况是您的密钥空间"分布在很长一段时间内的情况.例如,您可能长时间无法获取某些键的任何输入记录.在非窗口情况下,这些记录的聚合将一直存储,而对于窗口情况,键/聚合记录将被删除,如果稍后出现具有此键的记录,则将重新创建新条目再次(但请记住,在这种情况下,您丢失了先前的 agggate -- 参见 (1))

Even if you can also go with windowed semantics, you would not necessarily reduce memory. Note, in aggregation case, Streams does not store the raw records but only the current aggregate result (ie, key + currentAgg). Thus, for a single key, the storage requirement is the same for both cases (a single window has the same storage requirement). At the same time, if you go with windows, you might actually need more memory as you get an aggregate pro key pro window (while you get just a single aggregate pro key in the non-window case). The only scenario you might save memory, is the case for which you 'key space' is spread out over a long period of time. For example, you might not get any input records for some keys for a long time. In the non-windowed case, the aggregate(s) of those records will be stores all the time, while for the windowed case the key/agg record will be dropped and new entried will be re-created if records with this key occure later on again (but keep in mind, that you lost the previous aggergate in this case -- cf. (1))

最后但并非最不重要的一点是,您可能需要查看调整应用程序大小的指南:http://docs.confluent.io/current/streams/sizing.html

Last but not least, you might want to have a look into the guidelines for sizing an application: http://docs.confluent.io/current/streams/sizing.html

这篇关于Kafka Streams - 减少大型状态存储的内存占用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆