在 kafka 本地状态存储/变更日志中的保留时间 [英] Retention time in kafka local state store / changelog

查看:33
本文介绍了在 kafka 本地状态存储/变更日志中的保留时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 Kafka 和 Kafka Streams 作为 Spring Cloud Stream 的一部分.在我的 Kafka Streams 应用程序中流动的数据正在按特定时间窗口聚合和具体化:

Materialized>oneHour = Materialized.as("一小时店");oneHour.withLoggingEnabled(topicConfig);事件.map(getStringSensorMeasurementKeyValueKeyValueMapper()).groupByKey().windowedBy(TimeWindows.of(oneHourStore.getTimeUnit())).reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),(一小时));

按照设计,正在具体化的信息也有变更日志主题的支持.

我们的应用程序还有一个 rest 端点,它将像这样查询 statestore:

 ReadOnlyWindowStorewindowStore = queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());WindowStoreIteratoriter = windowStore.fetch(key, from, to);

查看它所创建的更改日志主题的设置:

min.insync.replicas 1cleanup.policy 删除保留时间.ms 5259600000保留字节数 -1

我认为当地的州商店至少会将信息保留 61 天(约 2 个月).然而,似乎只有最后一天的数据保留在商店中.

什么会导致数据这么快被删除?

更新解决方案Kafka Streams 2.0.1 版不包含 Materialized.withRetention 方法.对于这个特定版本,我能够使用以下解决我的问题的代码设置状态存储的保留时间:

TimeWindows timeWindows = TimeWindows.of(windowSizeMs);timeWindows.until(retentionMs);

让我的代码写成这样:

<预><代码>....groupByKey().windowedBy(timeWindows).reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),(一小时));...

解决方案

对于窗口化KTables,有本地保留时间和changlog保留时间.您可以通过 Materialized.withRetentionTime(...) 设置本地存储保留时间 -- 默认值为 24 小时.

<块引用>

对于较旧的 Kafka 版本,本地存储保留时间通过 Windows#until() 设置.

如果创建了新应用程序,则创建的变更日志主题的保留时间与本地存储保留时间相同.但是,如果您手动增加日志保留时间,这不会影响您的商店保留时间,但您需要相应地更新您的代码.当更改日志主题已经存在时也是如此:如果您更改本地存储保留时间,则更改日志主题配置不会自动更新.

还有一个 Jira:https://issues.apache.org/jira/browse/KAFKA-7591

I'm using Kafka and Kafka Streams as part of Spring Cloud Stream. The data that is flowing in my Kafka Streams app is being aggregated and materialized by certain time windows:

Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
    oneHour.withLoggingEnabled(topicConfig);
    events
            .map(getStringSensorMeasurementKeyValueKeyValueMapper())
            .groupByKey()
            .windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
            .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                    (oneHour));

As designed the information that is being materialized is also backed by a changelog topic.

Our app also has a rest endpoint that will query the statestore like this:

 ReadOnlyWindowStore<String, Double> windowStore =  queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
 WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);

Looking at the settings of the changelog topic that is created it reads:

min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1

I would assume that the local state store would at least keep the information for 61 days (~2 months). However it seems that only about the last day of data remains in the stores.

What could cause the data being removed so soon?

Update with solution The Kafka Streams version 2.0.1 does not contain the Materialized.withRetention method. For this particular version I was able to set the retention time of the state stores using the following code which solves my problem:

TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
    timeWindows.until(retentionMs);

making my code be written like:

...

.groupByKey()
        .windowedBy(timeWindows)
        .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                (oneHour));
...

解决方案

For windowed KTables there is a local retention time and there is the changlog retention time. You can set the local store retention time via Materialized.withRetentionTime(...) -- the default value is 24h.

For older Kafka release, the local store retention time is set via Windows#until().

If a new application is created, changelog topics are created with the same retention time as local store retention time. However, if you manually increase the log retention time, this won't affect your store retention time, but you need to update your code accordingly. This is also true when the changelog topic exist already: if you change the local store retention time, the changelog topic config is not update automatically.

There is a Jira for this as well: https://issues.apache.org/jira/browse/KAFKA-7591

这篇关于在 kafka 本地状态存储/变更日志中的保留时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆