如何对使用会话窗口的Kafka流应用程序进行单元测试 [英] How to unit test a kafka stream application that uses session window
问题描述
我正在使用Kafka Stream 2.1
I am working with Kafka Stream 2.1
我正在尝试为汇总的流应用程序编写一些测试 使用活动间隔为300毫秒的会话窗口,通过事件的键(即相关ID)来记录一些事件.
I am trying to write some test for a stream application that aggregates some events by their key (i.e by a correlation ID) using a session window with an inactivity gap of 300ms.
以下是由方法表示的聚合实现:
Here is the aggregation implementation represented by a method :
private static final int INACTIVITY_GAP = 300;
public KStream<String, AggregatedCustomObject> aggregate(KStream<String, CustomObject> source) {
return source
// group by key (i.e by correlation ID)
.groupByKey(Grouped.with(Serdes.String(), new CustomSerde()))
// Define a session window with an inactivity gap of 300 ms
.windowedBy(SessionWindows.with(Duration.ofMillis(INACTIVITY_GAP)).grace(Duration.ofMillis(INACTIVITY_GAP)))
.aggregate(
// initializer
() -> new AggregatedCustomObject(),
// aggregates records in same session
(s, customObject, aggCustomObject) -> {
// ...
return aggCustomObject;
},
// merge sessions
(s, aggCustomObject1, aggCustomObject2) -> {
// ...
return aggCustomObject2;
},
Materialized.with(Serdes.String(), new AggCustomObjectSerde())
)
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.selectKey((stringWindowed, aggCustomObject) -> "someKey");
;
}
此流过程按预期方式工作.但是对于单元测试,则是另一回事.
This stream process works as expected. But for unit tests, that's a different story.
我的测试流配置如下:
// ...
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, myCustomObjectSerde.getClass());
// disable cache
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
// commit ASAP
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
StreamsBuilder builder = new StreamsBuilder();
aggregate(builder.stream(INPUT_TOPIC), OUTPUT_TOPIC, new AggCustomObjectSerde())
.to(OUTPUT_TOPIC);
Topology topology = builder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
ConsumerRecordFactory<String, MyCustomObject> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), myCustomSerializer)
// ...
一个测试如下:
List<ConsumerRecord<byte[], byte[]>> records = myCustomMessages.stream()
.map(myCustomMessage -> factory.create(INPUT_TOPIC, myCustomMessage.correlationId, myCustomMessage))
.collect(Collectors.toList());
testDriver.pipeInput(records);
ProducerRecord<String, AggregatedCustomMessage> record = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), myAggregatedCustomObjectSerde);
问题是,record
始终为null.
我尝试了很多事情:
The problem is, record
is always null.
I tried a lot of things :
- 以超时循环读取
- 更改配置中的提交间隔,以便尽快提交结果
- 紧接着发送另一个具有不同键的记录(以触发窗口关闭,因为在KafkaStream中,事件时间基于记录时间戳)
- 调用测试驱动程序的
advanceWallClockTime
方法
- read in a loop with a timeout
- change commit interval in config so result would be committed ASAP
- Send an additional record with a different key just after (to trigger the window closing, as in KafkaStream event-time is based on record timestamps)
- call the
advanceWallClockTime
method of the test driver
好吧,没有任何帮助.有人可以告诉我我缺少了什么,我应该如何测试基于会话窗口的流应用程序?
Well, nothing helps. Could someone tell me what I am missing, and how should I test a session window based stream application ?
非常感谢
推荐答案
SessionWindows
与事件时间配合使用,而不与壁钟配合使用.尝试正确设置记录的事件时间以模拟不活动间隔.像这样:
SessionWindows
work with event-time and not wall-clock . Try to set the event-time of your record properly to simulate the inactivity gap. Something like:
testDriver.pipeInput(factory.create(INPUT_TOPIC, key1, record1, eventTimeMs));
testDriver.pipeInput(factory.create(INPUT_TOPIC, key2, record2, eventTimeMs + inactivityGapMs));
但是首先,您需要一个自定义的TimestampExtractor
,例如:
But first, you need a custom TimestampExtractor
like:
public static class RecordTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
return record.timestamp();
}
}
必须按照以下方式注册:
which has to be registered like:
streamProperties.setProperty(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
RecordTimestampExtractor.class.getName()
);
这篇关于如何对使用会话窗口的Kafka流应用程序进行单元测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!