卡夫卡本地状态存储/变更日志中的保留时间 [英] Retention time in kafka local state store / changelog

查看:112
本文介绍了卡夫卡本地状态存储/变更日志中的保留时间的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将Kafka和Kafka Streams用作Spring Cloud Stream的一部分.我的Kafka Streams应用程序中流动的数据通过某些时间窗口进行汇总和实现:

I'm using Kafka and Kafka Streams as part of Spring Cloud Stream. The data that is flowing in my Kafka Streams app is being aggregated and materialized by certain time windows:

Materialized<String, ErrorScore, WindowStore<Bytes, byte[]>> oneHour = Materialized.as("one-hour-store");
    oneHour.withLoggingEnabled(topicConfig);
    events
            .map(getStringSensorMeasurementKeyValueKeyValueMapper())
            .groupByKey()
            .windowedBy(TimeWindows.of(oneHourStore.getTimeUnit()))
            .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                    (oneHour));

按照设计,正在实现的信息也由changelog主题提供支持.

As designed the information that is being materialized is also backed by a changelog topic.

我们的应用程序还具有一个休息端点,它将像这样查询状态存储:

Our app also has a rest endpoint that will query the statestore like this:

 ReadOnlyWindowStore<String, Double> windowStore =  queryableStoreRegistry.getQueryableStoreType("one-hour-store", QueryableStoreTypes.windowStore());
 WindowStoreIterator<ErrorScore> iter = windowStore.fetch(key, from, to);

查看所创建的changelog主题的设置,内容为:

Looking at the settings of the changelog topic that is created it reads:

min.insync.replicas 1
cleanup.policy delete
retention.ms 5259600000
retention.bytes -1

我认为本地州立商店至少会将信息保存61天(约2个月).但是,似乎只有大约最后一天的数据保留在商店中.

I would assume that the local state store would at least keep the information for 61 days (~2 months). However it seems that only about the last day of data remains in the stores.

是什么原因导致这么快的数据被删除?

What could cause the data being removed so soon?

更新解决方案Kafka Streams 2.0.1版不包含Materialized.withRetention方法.对于这个特定版本,我可以使用以下代码来设置状态存储的保留时间,该代码可以解决我的问题:

Update with solution The Kafka Streams version 2.0.1 does not contain the Materialized.withRetention method. For this particular version I was able to set the retention time of the state stores using the following code which solves my problem:

TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
    timeWindows.until(retentionMs);

使我的代码编写为:

...

.groupByKey()
        .windowedBy(timeWindows)
        .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue),
                (oneHour));
...

推荐答案

对于带窗口的 KTable ,有一个本地保留时间,并且有changlog保留时间.您可以通过 Materialized.withRetentionTime(...)设置本地商店的保留时间-默认值为24h.

For windowed KTables there is a local retention time and there is the changlog retention time. You can set the local store retention time via Materialized.withRetentionTime(...) -- the default value is 24h.

对于较旧的Kafka版本,可通过 Windows#until()设置本地存储保留时间.

For older Kafka release, the local store retention time is set via Windows#until().

如果创建了新应用程序,则会以与本地商店保留时间相同的保留时间来创建变更日志主题.但是,如果您手动增加日志保留时间,这不会影响您的商店保留时间,但是您需要相应地更新代码.当changelog主题已经存在时也是如此:如果更改本地存储保留时间,则changelog主题配置不会自动更新.

If a new application is created, changelog topics are created with the same retention time as local store retention time. However, if you manually increase the log retention time, this won't affect your store retention time, but you need to update your code accordingly. This is also true when the changelog topic exist already: if you change the local store retention time, the changelog topic config is not update automatically.

为此也有一个Jira: https://issues.apache.org/jira/browse/KAFKA-7591

There is a Jira for this as well: https://issues.apache.org/jira/browse/KAFKA-7591

这篇关于卡夫卡本地状态存储/变更日志中的保留时间的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆