如何正确测试 Flink 窗口函数? [英] How to properly test a Flink window function?
问题描述
有谁知道如何在Flink
中测试窗口函数?我正在使用依赖项 flink-test-utils_2.11
.
Does anyone know how to test windowing functions in Flink
? I am using the dependency flink-test-utils_2.11
.
我的步骤是:
- 获取
StreamExecutionEnvironment
- 创建对象并添加到环境
- 做一个
keyBy
- 添加会话窗口
- 执行聚合函数
public class AggregateVariantCEVTest extends AbstractTestBase {
@Test
public void testAggregateVariantCev() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.fromElements(objectOne, objectTwo)
.keyBy(new KeyedByMyCustomKey())
.window(EventTimeSessionWindows.withGap(Time.seconds(1)))
.aggregate(new MyAgreggateFunction());
JobExecutionResult result = env.execute();
assertEquals(myExpectedResults, result.getAllAccumulatorResults());
}
}
问题在于 result.getAllAccumulatorResults()
大小为 0.
The problem is that result.getAllAccumulatorResults()
size is 0.
任何想法我做错了什么?提前致谢!
Any ideas what I am doing wrong? Thanks in advance!
推荐答案
这里可能正确的方法是使用 TestHarness
.一个很好的例子是 Flink 项目本身中的 WindowOperatorTest
.
Probably the right approach here is to use a TestHarness
. A good example is the WindowOperatorTest
in the Flink project itself.
此外,您可以查看https://github.com/knaufk/flink-testing-金字塔 示例如何在测试金字塔的不同级别上测试 Flink Job 以及有关测试的 Flink 文档 https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.
Furthermore, you can checkout https://github.com/knaufk/flink-testing-pyramid for examples how to test Flink Job on different levels of the testing pyramid and the Flink documentation on testing https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.
这篇关于如何正确测试 Flink 窗口函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!