测试 Kafka Streams 拓扑 [英] Test Kafka Streams topology

查看:30
本文介绍了测试 Kafka Streams 拓扑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种测试 Kafka Streams 应用程序的方法.这样我就可以定义输入事件,并且测试套件会向我显示输出.

I'm searching a way to test a Kafka Streams application. So that I can define the input events and the test suite shows me the output.

如果没有真正的 Kafka 设置,这可能吗?

Is this possible without a real Kafka setup?

推荐答案

更新 Kafka 1.1.0(2018 年 3 月 23 日发布):

Update Kafka 1.1.0 (released 23-Mar-2018):

KIP-247 添加了官方测试工具.根据升级指南:

KIP-247 added official test utils. Per the Upgrade Guide:

有一个新的工件 kafka-streams-test-utils 提供了 TopologyTestDriverConsumerRecordFactoryOutputVerifier> 班级.您可以将新工件作为常规依赖项包含在单元测试中,并使用测试驱动程序来测试 Kafka Streams 应用程序的业务逻辑.更多详情请参见 KIP-247.

There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.

来自文档:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>1.1.0</version>
        <scope>test</scope>
    </dependency>

测试驱动程序模拟库运行时不断从输入主题中获取记录并通过遍历拓扑来处理它们.您可以使用测试驱动程序来验证您指定的处理器拓扑是否通过手动管道输入数据记录计算出正确的结果.测试驱动程序捕获结果记录并允许查询其嵌入的状态存储:

The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores:

    // Create your topology
    Topology topology = new Topology();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

    // Run it on the test driver
    TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

    // Feed input data
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
    testDriver.pipe(factory.create("key", 42L));

    // Verify output
    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

有关详细信息,请参阅文档.

See the documentation for details.

ProcessorTopologyTestDriver 从 0.11.0.0 开始可用.它在 kafka-streams 测试工件中可用(在 Maven 中用 test 指定):

ProcessorTopologyTestDriver is available as of 0.11.0.0. It is available in the kafka-streams test artifact (specified with <classifier>test</classifier> in Maven):

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

您还需要添加 kafka-clients 测试工件:

You will also need to add the kafka-clients test artifact:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

然后就可以使用测试驱动了.根据 Javadoc,首先创建一个 ProcessorTopologyTestDriver:

Then you can use the test driver. Per the Javadoc, first create a ProcessorTopologyTestDriver:

    StringSerializer strSerializer = new StringSerializer();
    StringDeserializer strDeserializer = new StringDeserializer();
    Properties props = new Properties();
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
    props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    StreamsConfig config = new StreamsConfig(props);
    TopologyBuilder builder = ...
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

您可以将输入输入到拓扑中,就像您实际写入输入主题之一一样:

You can feed input into the topology as though you had actually written to one of the input topics:

    driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

并阅读输出主题:

    ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

然后您可以对这些结果进行断言.

Then you can assert on these results.

这篇关于测试 Kafka Streams 拓扑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆