如何对使用会话窗口的 kafka 流应用程序进行单元测试 [英] How to unit test a kafka stream application that uses session window

查看:24
本文介绍了如何对使用会话窗口的 kafka 流应用程序进行单元测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Kafka Stream 2.1

I am working with Kafka Stream 2.1

我正在尝试为聚合的流应用程序编写一些测试某些事件使用不活动间隔为 300 毫秒的会话窗口按其键(即关联 ID)进行.

I am trying to write some test for a stream application that aggregates some events by their key (i.e by a correlation ID) using a session window with an inactivity gap of 300ms.

这里是一个方法所代表的聚合实现:

Here is the aggregation implementation represented by a method :

    private static final int INACTIVITY_GAP = 300;

    public KStream<String, AggregatedCustomObject> aggregate(KStream<String, CustomObject> source) {

        return source
                // group by key (i.e by correlation ID)
                .groupByKey(Grouped.with(Serdes.String(), new CustomSerde()))
                // Define a session window with an inactivity gap of 300 ms
                .windowedBy(SessionWindows.with(Duration.ofMillis(INACTIVITY_GAP)).grace(Duration.ofMillis(INACTIVITY_GAP)))
                .aggregate(
                        // initializer
                        () -> new AggregatedCustomObject(),
                        // aggregates records in same session
                        (s, customObject, aggCustomObject) -> {
                            // ...
                            return aggCustomObject;
                        },
                        // merge sessions
                        (s, aggCustomObject1, aggCustomObject2) -> {
                            // ...
                            return aggCustomObject2;
                        },
                        Materialized.with(Serdes.String(), new AggCustomObjectSerde())
                )
                .suppress(Suppressed.untilWindowCloses(unbounded()))
                .toStream()
                .selectKey((stringWindowed, aggCustomObject) -> "someKey");
    ;
    }

此流过程按预期工作.但对于单元测试,情况就不同了.

This stream process works as expected. But for unit tests, that's a different story.

我的测试流配置如下所示:

My test stream configuration looks like this:

        // ...

        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, myCustomObjectSerde.getClass());
        // disable cache
        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        // commit ASAP
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);


        StreamsBuilder builder = new StreamsBuilder();
        aggregate(builder.stream(INPUT_TOPIC), OUTPUT_TOPIC, new AggCustomObjectSerde())
.to(OUTPUT_TOPIC);

        Topology topology = builder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);
        ConsumerRecordFactory<String, MyCustomObject> factory = new ConsumerRecordFactory<>(INPUT_TOPIC, new StringSerializer(), myCustomSerializer)

        // ...

测试如下所示:

List<ConsumerRecord<byte[], byte[]>> records = myCustomMessages.stream()
                .map(myCustomMessage -> factory.create(INPUT_TOPIC, myCustomMessage.correlationId, myCustomMessage))
                .collect(Collectors.toList());
testDriver.pipeInput(records);

ProducerRecord<String, AggregatedCustomMessage> record = testDriver.readOutput(OUTPUT_TOPIC, new StringDeserializer(), myAggregatedCustomObjectSerde);

问题是,record 始终为空.我尝试了很多东西:

The problem is, record is always null. I tried a lot of things :

  • 循环读取,超时
  • 更改配置中的提交间隔,以便尽快提交结果
  • 紧接着发送具有不同键的附加记录(以触发窗口关闭,因为在 KafkaStream 中,事件时间基于记录时间戳)
  • 调用测试驱动的advanceWallClockTime方法

好吧,没有任何帮助.有人可以告诉我我缺少什么,我应该如何测试基于会话窗口的流应用程序?

Well, nothing helps. Could someone tell me what I am missing, and how should I test a session window based stream application ?

非常感谢

推荐答案

SessionWindows 使用 event-time 而不是 wall-clock .尝试正确设置记录的事件时间以模拟不活动间隔.类似的东西:

SessionWindows work with event-time and not wall-clock . Try to set the event-time of your record properly to simulate the inactivity gap. Something like:

testDriver.pipeInput(factory.create(INPUT_TOPIC, key1, record1, eventTimeMs));
testDriver.pipeInput(factory.create(INPUT_TOPIC, key2, record2, eventTimeMs + inactivityGapMs));

但首先,您需要一个自定义的TimestampExtractor,例如:

But first, you need a custom TimestampExtractor like:

 public static class RecordTimestampExtractor implements TimestampExtractor {

    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
      return record.timestamp();
    }
  }

必须像这样注册:

 streamProperties.setProperty(
        StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
        RecordTimestampExtractor.class.getName()
    );

这篇关于如何对使用会话窗口的 kafka 流应用程序进行单元测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆