如何正确测试Flink窗口功能? [英] How to properly test a Flink window function?
问题描述
有人知道如何在 Flink
中测试开窗功能吗?我正在使用依赖项 flink-test-utils_2.11
.
Does anyone know how to test windowing functions in Flink
? I am using the dependency flink-test-utils_2.11
.
我的步骤是:
- 获取
StreamExecutionEnvironment
- 创建对象并添加到环境中
- 执行
keyBy
- 添加会话窗口
- 执行聚合函数
public class AggregateVariantCEVTest extends AbstractTestBase {
@Test
public void testAggregateVariantCev() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(objectOne, objectTwo)
.keyBy(new KeyedByMyCustomKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.aggregate(new MyAgreggateFunction());
JobExecutionResult result = env.execute();
assertEquals(myExpectedResults, result.getAllAccumulatorResults());
}
}
问题在于 result.getAllAccumulatorResults()
的大小为0.
有什么想法我做错了吗?预先感谢!
Any ideas what I am doing wrong? Thanks in advance!
推荐答案
这里正确的方法可能是使用 TestHarness
.一个很好的例子是Flink项目本身中的 WindowOperatorTest
.
Probably the right approach here is to use a TestHarness
. A good example is the WindowOperatorTest
in the Flink project itself.
此外,您可以结帐 https://github.com/knaufk/flink-testing-金字塔(例如如何在测试金字塔的不同级别上测试Flink Job)以及有关测试
Furthermore, you can checkout https://github.com/knaufk/flink-testing-pyramid for examples how to test Flink Job on different levels of the testing pyramid and the Flink documentation on testing https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.
这篇关于如何正确测试Flink窗口功能?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!