Flink中复杂拓扑(多个输入)的集成测试 [英] Integration test for complex topology (multiple inputs) in Flink
问题描述
我需要为flink流拓扑编写单元测试.它基本上是一个CoFlatMapFunction
,它有2个输入.
我试图从此页面中获得一些启发:解决方案
flink培训练习中有一个使用TwoInputStreamOperatorTestHarness的示例,您可以参考以下示例:
您将需要以下依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
您应该记住,这不是受支持的公共界面,因此它可能会以意想不到的方式演变.
I need to write unit test for a flink streaming topology. It's basically a CoFlatMapFunction
, and it has 2 inputs.
I try to get some inspiration from this page: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
The order of the inputs matter to my topology, so when I test, I can't use StreamExecutionEnvironment#fromCollection
for each input, as I won't control the order in which data points are injected in each input.
I've tried to create a single input using StreamExecutionEnvironment#fromCollection
and dispatch each element to the actual input of my CoFlatMapFunction
based on their type, but the order of elements is lost in this operation.
Is there another way to write this test?
The flink training exercises have an example of using a TwoInputStreamOperatorTestHarness that you can refer to:
You'll need these dependencies:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
You should keep in mind that this isn't a public, supported interface, so it could evolve in unexpected ways.
这篇关于Flink中复杂拓扑(多个输入)的集成测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!