Flink中复杂拓扑(多输入)的集成测试 [英] Integration test for complex topology (multiple inputs) in Flink

查看:28
本文介绍了Flink中复杂拓扑(多输入)的集成测试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要为 flink 流拓扑编写单元测试.它基本上是一个 CoFlatMapFunction,它有 2 个输入.

I need to write unit test for a flink streaming topology. It's basically a CoFlatMapFunction, and it has 2 inputs.

我试着从这个页面得到一些灵感:https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

I try to get some inspiration from this page: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

输入的顺序对我的拓扑很重要,所以当我测试时,我不能对每个输入使用 StreamExecutionEnvironment#fromCollection,因为我不会控制数据点的顺序在每个输入中注入.

The order of the inputs matter to my topology, so when I test, I can't use StreamExecutionEnvironment#fromCollection for each input, as I won't control the order in which data points are injected in each input.

我尝试使用 StreamExecutionEnvironment#fromCollection 创建单个输入,并根据它们的类型将每个元素分派到我的 CoFlatMapFunction 的实际输入,但顺序元素在此操作中丢失.

I've tried to create a single input using StreamExecutionEnvironment#fromCollection and dispatch each element to the actual input of my CoFlatMapFunction based on their type, but the order of elements is lost in this operation.

还有其他方法可以编写此测试吗?

Is there another way to write this test?

推荐答案

flink 训练练习中有一个使用 TwoInputStreamOperatorTestHarness 的例子,可以参考:

The flink training exercises have an example of using a TwoInputStreamOperatorTestHarness that you can refer to:

https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java

您将需要这些依赖项:

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-test-utils-junit</artifactId>
  <version>${flink.version}</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

<dependency>
  <groupId>org.mockito</groupId>
  <artifactId>mockito-all</artifactId>
  <version>1.10.19</version>
  <type>jar</type>
  <scope>test</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-runtime_2.11</artifactId>
  <version>${flink.version}</version>
  <scope>test</scope>
  <type>test-jar</type>
</dependency>

您应该记住,这不是一个公开的、受支持的接口,因此它可能会以意想不到的方式发展.

You should keep in mind that this isn't a public, supported interface, so it could evolve in unexpected ways.

这篇关于Flink中复杂拓扑(多输入)的集成测试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆