如何对Flink ProcessFunction进行单元测试? [英] How to unit test a Flink ProcessFunction?
问题描述
我有一个简单的ProcessFunction,它接受String作为输入,并给出一个String作为输出.如何使用Junit对此进行单元测试?由于processElement方法是一个void方法,因此不返回任何值.
I have a simple ProcessFunction that takes in String as input and gives a String as output. How do I unit test this using Junit? As the processElement method is a void method and returns no value.
public class SampleProcessFunction extends ProcessFunction<String, String>{
@Override
public void processElement(String content, Context context, Collector<String> collector) throws Exception {
String output = content + "output";
collector.collect(output);
}
}
推荐答案
为了对该方法进行单元测试,请定义预期的行为.在这种情况下,预期的行为是使用content + "output"
作为参数的Collector::collect
方法的单次调用.
因此,可以使用模拟收集器对此进行测试.
In order to unit test this method, define the expected behavior. In this case, the expected behavior is a single invocation of Collector::collect
method with content + "output"
as an argument.
Thereby, this could be tested using mocked collector.
以下是使用 Mockito 框架的示例:
Here is an example using Mockito framework:
...
private final Collector<String> collectorMock = Mockito.mock(Collector.class);
private final Context contextMock = Mockito.mock(Context.class);
private final SampleProcessFunction sampleProcessFunction = new SampleProcessFunction();
@Test
public void testProcessElement_shouldInvokeCollector_whenAnyValuePassed() throws Exception {
// given
final String content = "hello ";
// when
sampleProcessFunction.processElement(content, contextMock, collectorMock);
// then
Mockito.verify(collectorMock).collect(content + "output"); // verifies that collector method was invoked with "hello output" exactly once
}
...
这篇关于如何对Flink ProcessFunction进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!