如何测试 WindowStore 保留期? [英] How to test a WindowStore retention period?

查看:15
本文介绍了如何测试 WindowStore 保留期?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对传入的 kafka 消息进行重复数据删除(我正在轮询一个数据源,该数据源可以在第二天提供给定日期的所有数据点,但时间不一致,所以我每 x 分钟轮询一次,我想要对数据点进行重复数据删除,以获得仅包含新点的干净下游主题).

I'm trying to deduplicate incoming kafka messages (i'm polling a datasource that make all the data points of a given day available the next day but at an inconsistent time, so i'm polling every x minutes and I want to deduplicate the data points to have a clean downstream topic containing only the new points).

为此,我构建了一个自定义转换器,它依赖于商店来跟踪哪个点"是已经被处理了.由于数据点的日期时间是重复数据删除键的一部分,我有一组无限的键,所以我不能依赖简单的 KeyValueStore.我的理解是 WindowStore 只允许我保留特定保留期(在我的情况下为 2 天)的密钥,所以这就是我正在使用的.

For that I've built a custom transformer that rely on a store to keep track of which "point" have already been processed. As the data point's datetime is part of the deduplication key, I have an unbounded set of keys, so I cannot rely on a simple KeyValueStore. It's my understanding that a WindowStore would allow me to keep only the keys for a specific retention period (2 days, in my case), so that's what I'm using.

我尝试使用 kafka-streams-test-utils 测试重复数据删除.重复数据删除工作得很好,但 windowStore 似乎并没有忘记"它.按键.我尝试使用更短的窗口大小和持续时间(1 秒),但我仍然无法让它忘记超过保留期的键/值.

I tried to test the deduplication using kafka-streams-test-utils. The deduplication works well enough, but the windowStore does not seems to "forget" the keys. I tried with a shorter window size and duration (1s), but I'm still not able to have it forget the keys/values that are past the retention period.

商店的配置:我希望对象在商店中停留约 2 秒

Configuration of the store : I expect objects to stay for ~2 seconds in the store

config.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,"1");
...
final StoreBuilder<WindowStore<String, AvroBicycleCount>> deduplicationStoreBuilder = Stores.windowStoreBuilder(
            Stores.persistentWindowStore(deduplicationStore, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
            Serdes.String(),
            StreamUtils.AvroSerde()
);

我的变压器逻辑

@Override
public DataPoint transform(final String dataId, final DataPoint incoming) {
    String key = dataId+"_"+incoming.getDateTime();
    DataPoint previous = windowStore.fetch(key, incoming.getDateTime());
    if(previous != null)
        return null;
    
    windowStore.put(key, incoming, incoming.getDateTime());
    return incoming;
}

第三次测试失败

inputTopic.pipeInput("a", newDataPoint);
assertEquals(1, outputTopic.readRecordsToList().size(), "When a new data is emitted, it should go through");
    
inputTopic.pipeInput("a", newDataPoint);
assertEquals(0, outputTopic.readRecordsToList().size(), "When the same data is re-emitted, it should not go through");
    
TimeUnit.SECONDS.sleep(10);

inputTopic.pipeInput("a", newDataPoint);
assertEquals(1, outputTopic.readRecordsToList().size(), "When the same data is re-emitted well past the retention period, it should go through");
    

我对 windowStore 的保留有什么不正确的理解吗?

Is there something I'm not understanding correctly about the windowStore's retention ?

推荐答案

WindowedStore 在内部使用所谓的 segments 来使数据过期.即,您的保留时间的时间范围被拆分为较小的时间范围,并且每个时间范围都有一个段来存储相应的数据(在内部,一个段映射到一个商店,即一个 WindowedStore 内部其实就是多个store).如果一个segment中的所有记录都过期了,整个segment会通过删除相应的store来丢弃(这比逐条记录过期更有效).

A WindowedStore uses so-called segments internally to expire data. Ie, the time-range of your retention-time is split into smaller time-ranges and there is a segment for each time-range to store the corresponding data (internally, a segment maps to a store, ie, a WindowedStore is actually multiple stores internally). If all records in a segment are expired, the whole segment is dropped by deleting the corresponding store (this is more efficient than record-by-record expiration).

此外,最小(硬编码)段大小为 60 秒,段数为 2(硬编码),以避免段太小(且效率低下).因此,对于 2 天保留时间的情况,您会得到两个段,每个段的时间范围为 1 天.因此,数据(在段的开头)最多可以保留 3 天,直到旧段被删除.

Also, there is a minimum (hard-coded) segment size of 60 seconds and the number of segment is 2 (hardcoded), to avoid too small (and inefficient) segments. Thus, for you case of a 2 day retention time, you get two segment with a time range of 1 day each. Thus, data (at the beginning of a segment) can be up to 3 days old until an old segment is dropped.

因此,数据被有效地删除了一些延迟.不能配置段数

Thus, data is effectively deleted with some delay. You cannot configure the number of segments

这篇关于如何测试 WindowStore 保留期?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆