测试Kafka Streams拓扑 [英] Test Kafka Streams topology

查看:100
本文介绍了测试Kafka Streams拓扑的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种测试Kafka Streams应用程序的方法.这样我就可以定义输入事件,并且测试套件可以向我显示输出.

I'm searching a way to test a Kafka Streams application. So that I can define the input events and the test suite shows me the output.

如果没有真正的Kafka设置,这可能吗?

Is this possible without a real Kafka setup?

推荐答案

更新 Kafka 1.1.0(2018年3月23日发布):

Update Kafka 1.1.0 (released 23-Mar-2018):

KIP-247 添加了官方测试工具.根据升级指南:

KIP-247 added official test utils. Per the Upgrade Guide:

有一个新的工件kafka-streams-test-utils,提供了TopologyTestDriverConsumerRecordFactoryOutputVerifier类.您可以将新工件作为单元测试的常规依赖项包括在内,并使用测试驱动程序来测试Kafka Streams应用程序的业务逻辑.有关更多详细信息,请参见 KIP-247 .

There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.

文档:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams-test-utils</artifactId>
    <version>1.1.0</version>
    <scope>test</scope>
</dependency>

测试驱动程序模拟库运行时,该运行时从输入主题中连续获取记录并通过遍历拓扑对其进行处理.您可以使用测试驱动程序来验证您指定的处理器拓扑是否通过手动管道传输到数据记录中来计算出正确的结果.测试驱动程序捕获结果记录,并允许查询其嵌入式状态存储:

The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores:

// Create your topology
Topology topology = new Topology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));

// Verify output
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

有关详细信息,请参见文档.

See the documentation for details.

ProcessorTopologyTestDriver is available as of 0.11.0.0. It is available in the kafka-streams test artifact (specified with <classifier>test</classifier> in Maven):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.11.0.0</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

您还需要添加kafka-clients测试工件:

You will also need to add the kafka-clients test artifact:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

然后可以使用测试驱动程序.根据Javadoc,首先创建ProcessorTopologyTestDriver:

Then you can use the test driver. Per the Javadoc, first create a ProcessorTopologyTestDriver:

StringSerializer strSerializer = new StringSerializer();
StringDeserializer strDeserializer = new StringDeserializer();
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = ...
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

您可以将输入提供给拓扑,就像您实际上已经写了输入主题之一一样:

You can feed input into the topology as though you had actually written to one of the input topics:

driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

并阅读输出主题:

ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

然后您可以对这些结果进行断言.

Then you can assert on these results.

这篇关于测试Kafka Streams拓扑的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆