Flink中复杂拓扑(多个输入)的集成测试 [英] Integration test for complex topology (multiple inputs) in Flink

查看:219
本文介绍了Flink中复杂拓扑(多个输入)的集成测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要为flink流拓扑编写单元测试.它基本上是一个CoFlatMapFunction,它有2个输入.

我试图从此页面中获得一些启发:解决方案

flink培训练习中有一个使用TwoInputStreamOperatorTestHarness的示例,您可以参考以下示例:

https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java

您将需要以下依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils-junit</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

<dependency>
  <groupId>org.mockito</groupId>
  <artifactId>mockito-all</artifactId>
  <version>1.10.19</version>
  <type>jar</type>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

您应该记住,这不是受支持的公共界面,因此它可能会以意想不到的方式演变.

I need to write unit test for a flink streaming topology. It's basically a CoFlatMapFunction, and it has 2 inputs.

I try to get some inspiration from this page: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

The order of the inputs matter to my topology, so when I test, I can't use StreamExecutionEnvironment#fromCollection for each input, as I won't control the order in which data points are injected in each input.

I've tried to create a single input using StreamExecutionEnvironment#fromCollection and dispatch each element to the actual input of my CoFlatMapFunction based on their type, but the order of elements is lost in this operation.

Is there another way to write this test?

解决方案

The flink training exercises have an example of using a TwoInputStreamOperatorTestHarness that you can refer to:

https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java

You'll need these dependencies:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils-junit</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

<dependency>
  <groupId>org.mockito</groupId>
  <artifactId>mockito-all</artifactId>
  <version>1.10.19</version>
  <type>jar</type>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

You should keep in mind that this isn't a public, supported interface, so it could evolve in unexpected ways.

这篇关于Flink中复杂拓扑(多个输入)的集成测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆