Kafka 流测试无法正确关闭工作 [英] Kafka streams tests do not correct work close
本文介绍了Kafka 流测试无法正确关闭工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我有 2 个单元测试
当我运行它们时出现以下错误
when i run them i have the error below
1) 测试
@Test
public void simpleInsertAndOutputEventPrint() throws IOException, URISyntaxException {
GenericRecord record = getInitialEvent();
testDriver.pipeInput(recordFactory.create(record));
GenericRecord result = testDriver.readOutput(detailsEventTopic, stringDeserializer, genericAvroSerde.deserializer()).value();
Assert.assertEquals(1,result.get("tt"));
}
2) 测试
@Test
public void stateStoreSimpleInsertOutputPrint() {
GenericRecord record = getInitialAvayaEvent();
testDriver.pipeInput(recordFactory.create(record));
Packet packet1 = (Packet) store.get("dddfdfdf");
Assert.assertEquals("ddd",packet1.getc1());
}
方法初始化
@Before
public void setUp() throws IOException, RestClientException, URISyntaxException {
...
recordFactory = new ConsumerRecordFactory<>(initialSourceTopic,new StringSerializer(), genericAvroSerde.serializer());
testDriver = new TopologyTestDriver(topology, props);
this.store = testDriver.getKeyValueStore(db);
}
当我尝试添加下一个代码时:
also when i tried to add the next code:
@After
public void tearDown() {
testDriver.close(); // Close processors after finish the tests
}
我遇到了下一个错误:
[2018-09-25 22:45:38,178] ERROR stream-thread [main] Failed to delete the state directory. (org.apache.kafka.streams.processor.internals.StateDirectory)
java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\ks-stock-analysis-appid\0_0
at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:740)
at org.apache.kafka.common.utils.Utils$2.postVisitDirectory(Utils.java:723)
at java.nio.file.Files.walkFileTree(Files.java:2688)
at java.nio.file.Files.walkFileTree(Files.java:2742)
at org.apache.kafka.common.utils.Utils.delete(Utils.java:723)
at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:287)
at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:228)
at org.apache.kafka.streams.TopologyTestDriver.close(TopologyTestDriver.java:679)
at com.dvsts.avaya.processing.topology.TopologyKafkaStreamTest.tearDown(TopologyKafkaStreamTest.java:235)
推荐答案
对于测试,可以为每个 KTable
使用 IN_MEMORY("in-memory")
存储创建(直接或间接,例如聚合);这避免了创建任何目录,从而不再发生错误.
For tests one could use an IN_MEMORY("in-memory")
store for each KTable
created (directly or indirectly, by e.g. aggregations); this avoids the creation of any directory such that the error no longer occurs.
这篇关于Kafka 流测试无法正确关闭工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文