如何测试 WindowStore 保留期? [英] How to test a WindowStore retention period?
问题描述
我正在尝试对传入的 kafka 消息进行重复数据删除(我正在轮询一个数据源,该数据源可以在第二天提供给定日期的所有数据点,但时间不一致,所以我每 x 分钟轮询一次,我想要对数据点进行重复数据删除,以获得仅包含新点的干净下游主题).
I'm trying to deduplicate incoming kafka messages (i'm polling a datasource that make all the data points of a given day available the next day but at an inconsistent time, so i'm polling every x minutes and I want to deduplicate the data points to have a clean downstream topic containing only the new points).
为此,我构建了一个自定义转换器,它依赖于商店来跟踪哪个点"是已经被处理了.由于数据点的日期时间是重复数据删除键的一部分,我有一组无限的键,所以我不能依赖简单的 KeyValueStore.我的理解是 WindowStore 只允许我保留特定保留期(在我的情况下为 2 天)的密钥,所以这就是我正在使用的.
For that I've built a custom transformer that rely on a store to keep track of which "point" have already been processed. As the data point's datetime is part of the deduplication key, I have an unbounded set of keys, so I cannot rely on a simple KeyValueStore. It's my understanding that a WindowStore would allow me to keep only the keys for a specific retention period (2 days, in my case), so that's what I'm using.
我尝试使用 kafka-streams-test-utils 测试重复数据删除.重复数据删除工作得很好,但 windowStore 似乎并没有忘记"它.按键.我尝试使用更短的窗口大小和持续时间(1 秒),但我仍然无法让它忘记超过保留期的键/值.
I tried to test the deduplication using kafka-streams-test-utils. The deduplication works well enough, but the windowStore does not seems to "forget" the keys. I tried with a shorter window size and duration (1s), but I'm still not able to have it forget the keys/values that are past the retention period.
商店的配置:我希望对象在商店中停留约 2 秒
Configuration of the store : I expect objects to stay for ~2 seconds in the store
config.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,"1");
...
final StoreBuilder<WindowStore<String, AvroBicycleCount>> deduplicationStoreBuilder = Stores.windowStoreBuilder(
Stores.persistentWindowStore(deduplicationStore, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
Serdes.String(),
StreamUtils.AvroSerde()
);
我的变压器逻辑
@Override
public DataPoint transform(final String dataId, final DataPoint incoming) {
String key = dataId+"_"+incoming.getDateTime();
DataPoint previous = windowStore.fetch(key, incoming.getDateTime());
if(previous != null)
return null;
windowStore.put(key, incoming, incoming.getDateTime());
return incoming;
}
第三次测试失败
inputTopic.pipeInput("a", newDataPoint);
assertEquals(1, outputTopic.readRecordsToList().size(), "When a new data is emitted, it should go through");
inputTopic.pipeInput("a", newDataPoint);
assertEquals(0, outputTopic.readRecordsToList().size(), "When the same data is re-emitted, it should not go through");
TimeUnit.SECONDS.sleep(10);
inputTopic.pipeInput("a", newDataPoint);
assertEquals(1, outputTopic.readRecordsToList().size(), "When the same data is re-emitted well past the retention period, it should go through");
我对 windowStore 的保留有什么不正确的理解吗?
Is there something I'm not understanding correctly about the windowStore's retention ?
推荐答案
WindowedStore
在内部使用所谓的 segments 来使数据过期.即,您的保留时间的时间范围被拆分为较小的时间范围,并且每个时间范围都有一个段来存储相应的数据(在内部,一个段映射到一个商店,即一个 WindowedStore
内部其实就是多个store).如果一个segment中的所有记录都过期了,整个segment会通过删除相应的store来丢弃(这比逐条记录过期更有效).
A WindowedStore
uses so-called segments internally to expire data. Ie, the time-range of your retention-time is split into smaller time-ranges and there is a segment for each time-range to store the corresponding data (internally, a segment maps to a store, ie, a WindowedStore
is actually multiple stores internally). If all records in a segment are expired, the whole segment is dropped by deleting the corresponding store (this is more efficient than record-by-record expiration).
此外,最小(硬编码)段大小为 60 秒,段数为 2(硬编码),以避免段太小(且效率低下).因此,对于 2 天保留时间的情况,您会得到两个段,每个段的时间范围为 1 天.因此,数据(在段的开头)最多可以保留 3 天,直到旧段被删除.
Also, there is a minimum (hard-coded) segment size of 60 seconds and the number of segment is 2 (hardcoded), to avoid too small (and inefficient) segments. Thus, for you case of a 2 day retention time, you get two segment with a time range of 1 day each. Thus, data (at the beginning of a segment) can be up to 3 days old until an old segment is dropped.
因此,数据被有效地删除了一些延迟.不能配置段数
Thus, data is effectively deleted with some delay. You cannot configure the number of segments
这篇关于如何测试 WindowStore 保留期?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!