如何测试WindowStore保留期? [英] How to test a WindowStore retention period?

查看:98
本文介绍了如何测试WindowStore保留期?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对传入的kafka消息进行重复数据删除(我正在轮询一个数据源,该数据源可让给定日期的第二天的所有数据点在第二天可用,但时间不一致,所以我每隔x分钟轮询一次,我想对数据点进行重复数据删除,以得到一个仅包含新点的干净的下游主题.

为此,我建立了一个自定义转换器,该转换器依赖于商店来跟踪哪个点".已经被处理.由于数据点的日期时间是重复数据删除密钥的一部分,因此我拥有一组无限制的密钥,因此我不能依赖简单的KeyValueStore.据我了解,WindowStore将允许我仅将密钥保留特定的保留期限(以我为例,为2天),所以这就是我所使用的.

我尝试使用 kafka-streams-test-utils 测试重复数据删除.重复数据删除效果很好,但是windowStore似乎并没有忘记"消息.按键.我尝试使用较短的窗口大小和持续时间(1s),但仍然无法忘记保留期之后的键/值.

商店的配置::我希望对象在商店中停留2秒钟左右

  config.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,"1");...最后的StoreBuilder< WindowStore< String,AvroBicycleCount>>deduplicationStoreBuilder = Stores.windowStoreBuilder(Stores.persistentWindowStore(deduplicationStore,Duration.ofSeconds(1),Duration.ofSeconds(1),false),Serdes.String(),StreamUtils.AvroSerde()); 

我的变压器逻辑

  @Override公共DataPoint转换(最终的String dataId,最后传入的DataPoint){字符串键= dataId +"_" + incoming.getDateTime();DataPoint上一个= windowStore.fetch(key,entry.getDateTime());if(上一个!= null)返回null;windowStore.put(key,incoming,incoming.getDateTime());退货} 

第三次测试失败

  inputTopic.pipeInput("a",newDataPoint);assertEquals(1,outputTopic.readRecordsToList().size(),发出新数据时,它应该经过");inputTopic.pipeInput("a",newDataPoint);assertEquals(0,outputTopic.readRecordsToList().size(),重新发送相同的数据时,它不应通过").TimeUnit.SECONDS.sleep(10);inputTopic.pipeInput("a",newDataPoint);assertEquals(1,outputTopic.readRecordsToList().size(),当相同数据在保留期之后很长时间重新发出时,它应该经过"). 

关于windowStore的保留权,我是否有一些不正确的理解?

解决方案

WindowedStore 在内部使用所谓的 segments 来使数据过期.即,将保留时间的时间范围划分为较小的时间范围,并且每个时间范围都有一个段来存储相应的数据(内部,一个段映射到一个商店,即 WindowedStore实际上是内部的多个商店).如果段中的所有记录都已过期,则通过删除相应的存储删除整个段(这比逐条记录的过期效率更高).

此外,最小(硬编码)段大小为60秒,并且段数为2(硬编码),以避免太小(且效率低下)的段.因此,对于2天的保留时间,您将获得两个细分,每个细分的时间范围均为1天.因此,数据(在段的开头)最多可以保留3天,直到删除旧段为止.

因此,有效地删除数据会有一定的延迟.您无法配置细分数量

I'm trying to deduplicate incoming kafka messages (i'm polling a datasource that make all the data points of a given day available the next day but at an inconsistent time, so i'm polling every x minutes and I want to deduplicate the data points to have a clean downstream topic containing only the new points).

For that I've built a custom transformer that rely on a store to keep track of which "point" have already been processed. As the data point's datetime is part of the deduplication key, I have an unbounded set of keys, so I cannot rely on a simple KeyValueStore. It's my understanding that a WindowStore would allow me to keep only the keys for a specific retention period (2 days, in my case), so that's what I'm using.

I tried to test the deduplication using kafka-streams-test-utils. The deduplication works well enough, but the windowStore does not seems to "forget" the keys. I tried with a shorter window size and duration (1s), but I'm still not able to have it forget the keys/values that are past the retention period.

Configuration of the store : I expect objects to stay for ~2 seconds in the store

config.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,"1");
...
final StoreBuilder<WindowStore<String, AvroBicycleCount>> deduplicationStoreBuilder = Stores.windowStoreBuilder(
            Stores.persistentWindowStore(deduplicationStore, Duration.ofSeconds(1), Duration.ofSeconds(1), false),
            Serdes.String(),
            StreamUtils.AvroSerde()
);

My transformer logic

@Override
public DataPoint transform(final String dataId, final DataPoint incoming) {
    String key = dataId+"_"+incoming.getDateTime();
    DataPoint previous = windowStore.fetch(key, incoming.getDateTime());
    if(previous != null)
        return null;
    
    windowStore.put(key, incoming, incoming.getDateTime());
    return incoming;
}

The third test fail

inputTopic.pipeInput("a", newDataPoint);
assertEquals(1, outputTopic.readRecordsToList().size(), "When a new data is emitted, it should go through");
    
inputTopic.pipeInput("a", newDataPoint);
assertEquals(0, outputTopic.readRecordsToList().size(), "When the same data is re-emitted, it should not go through");
    
TimeUnit.SECONDS.sleep(10);

inputTopic.pipeInput("a", newDataPoint);
assertEquals(1, outputTopic.readRecordsToList().size(), "When the same data is re-emitted well past the retention period, it should go through");
    

Is there something I'm not understanding correctly about the windowStore's retention ?

解决方案

A WindowedStore uses so-called segments internally to expire data. Ie, the time-range of your retention-time is split into smaller time-ranges and there is a segment for each time-range to store the corresponding data (internally, a segment maps to a store, ie, a WindowedStore is actually multiple stores internally). If all records in a segment are expired, the whole segment is dropped by deleting the corresponding store (this is more efficient than record-by-record expiration).

Also, there is a minimum (hard-coded) segment size of 60 seconds and the number of segment is 2 (hardcoded), to avoid too small (and inefficient) segments. Thus, for you case of a 2 day retention time, you get two segment with a time range of 1 day each. Thus, data (at the beginning of a segment) can be up to 3 days old until an old segment is dropped.

Thus, data is effectively deleted with some delay. You cannot configure the number of segments

这篇关于如何测试WindowStore保留期?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆