如何对 Flink ProcessFunction 进行单元测试? [英] How to unit test a Flink ProcessFunction?

查看:49
本文介绍了如何对 Flink ProcessFunction 进行单元测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个简单的 ProcessFunction,它接受字符串作为输入并给出一个字符串作为输出.我如何使用 Junit 对其进行单元测试?由于 processElement 方法是一个 void 方法并且不返回任何值.

public class SampleProcessFunction extends ProcessFunction{@覆盖public void processElement(字符串内容,上下文上下文,收集器<字符串>收集器)抛出异常{字符串输出=内容+输出";收集器.收集(输出);}}

解决方案

为了对这个方法进行单元测试,定义预期的行为.在这种情况下,预期行为是使用 content + "output" 作为参数对 Collector::collect 方法的单次调用.因此,这可以使用模拟收集器进行测试.

这是一个使用 Mockito 框架的示例:

<预><代码>...私有最终收集器CollectorMock = Mockito.mock(Collector.class);private final Context contextMock = Mockito.mock(Context.class);private final SampleProcessFunction sampleProcessFunction = new SampleProcessFunction();@测试public void testProcessElement_shouldInvokeCollector_whenAnyValuePassed() 抛出异常 {//给定最终字符串内容 = "你好";//什么时候sampleProcessFunction.processElement(内容,contextMock,collectorMock);//然后Mockito.verify(collectorMock).collect(content + "output");//验证收集器方法被hello output"调用了一次}...

I have a simple ProcessFunction that takes in String as input and gives a String as output. How do I unit test this using Junit? As the processElement method is a void method and returns no value.

public class SampleProcessFunction extends ProcessFunction<String, String>{
    @Override
    public void processElement(String content, Context context, Collector<String> collector) throws Exception {
        String output = content + "output";
        collector.collect(output);
    }
}

解决方案

In order to unit test this method, define the expected behavior. In this case, the expected behavior is a single invocation of Collector::collect method with content + "output" as an argument. Thereby, this could be tested using mocked collector.

Here is an example using Mockito framework:

...

private final Collector<String> collectorMock = Mockito.mock(Collector.class);
private final Context contextMock = Mockito.mock(Context.class);

private final SampleProcessFunction sampleProcessFunction = new SampleProcessFunction();

@Test
public void testProcessElement_shouldInvokeCollector_whenAnyValuePassed() throws Exception {
    // given
    final String content = "hello ";

    // when
    sampleProcessFunction.processElement(content, contextMock, collectorMock);

    // then
    Mockito.verify(collectorMock).collect(content + "output"); // verifies that collector method was invoked with "hello output" exactly once
}

...

这篇关于如何对 Flink ProcessFunction 进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆