如何正确测试 Flink 窗口函数? [英] How to properly test a Flink window function?

查看:19
本文介绍了如何正确测试 Flink 窗口函数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有谁知道如何在Flink中测试窗口函数?我正在使用依赖项 flink-test-utils_2.11.

Does anyone know how to test windowing functions in Flink? I am using the dependency flink-test-utils_2.11.

我的步骤是:

  1. 获取StreamExecutionEnvironment
  2. 创建对象并添加到环境
  3. 做一个keyBy
  4. 添加会话窗口
  5. 执行聚合函数

public class AggregateVariantCEVTest extends AbstractTestBase {

   @Test
    public void testAggregateVariantCev() throws Exception  {
       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
       env.setParallelism(1);

       env.fromElements(objectOne, objectTwo)
               .keyBy(new KeyedByMyCustomKey())
               .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
               .aggregate(new MyAgreggateFunction());


       JobExecutionResult result = env.execute();

       assertEquals(myExpectedResults, result.getAllAccumulatorResults());

   }
}

问题在于 result.getAllAccumulatorResults() 大小为 0.

The problem is that result.getAllAccumulatorResults() size is 0.

任何想法我做错了什么?提前致谢!

Any ideas what I am doing wrong? Thanks in advance!

推荐答案

这里可能正确的方法是使用 TestHarness.一个很好的例子是 Flink 项目本身中的 WindowOperatorTest.

Probably the right approach here is to use a TestHarness. A good example is the WindowOperatorTest in the Flink project itself.

此外,您可以查看https://github.com/knaufk/flink-testing-金字塔 示例如何在测试金字塔的不同级别上测试 Flink Job 以及有关测试的 Flink 文档 https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.

Furthermore, you can checkout https://github.com/knaufk/flink-testing-pyramid for examples how to test Flink Job on different levels of the testing pyramid and the Flink documentation on testing https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/testing.html.

这篇关于如何正确测试 Flink 窗口函数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆