使用 MiniCluster 测试 flink 作业以使用处理时间触发计时器 [英] testing flink jobs with MiniCluster to trigger the timer using processing time
问题描述
在使用MiniClusterWithClientResource测试flink作业时,有没有办法控制触发定时器的处理时间?
is there any way to control the processing time to trigger the timer when testing flink jobs with MiniClusterWithClientResource?
我能够在单元测试中使用testharness<来测试KeyedCoProcessFunction的两种方法,即processElement()...触发计时器回调即onTimer().../strong> 并控制处理时间,即:
I'm able to test both the methods of the KeyedCoProcessFunction i.e. processElement()... triggering timer callback i.e onTimer()... in the Unit Tests using a testharness and controlling the processing time i.e:
//直接提前算子的处理时间触发处理时间定时器testHarness.setProcessingTime(300000)
//trigger processing time timer by advancing the processing time of the operator directly testHarness.setProcessingTime(300000)
这样.我可以在指定时间触发定时器
Thus. I can trigger the timer at the specified time
但是,我现在需要的是使用 miniCluster MiniClusterWithClientResource
however, what I need now is trigger the timer in a end to end flink job test using minicluster MiniClusterWithClientResource
val flinkCluster = 新的 MiniClusterWithClientResource...并且能够提前处理时间来触发 onTimer 方法
val flinkCluster = new MiniClusterWithClientResource... and be able to advance the processing time to fire the onTimer method
推荐答案
在发送完所有消息后在 SourceFunction 类中添加 Thread.sleep(1000) 一秒钟解决了问题.
adding a Thread.sleep(1000) one second in the SourceFunction class after all messages are sent solved the problem.
class MySourceFunction() extends RichParallelSourceFunction[]{
...
//is a one-time Delay after all messages have been sent
Thread.sleep(1000)
}
这篇关于使用 MiniCluster 测试 flink 作业以使用处理时间触发计时器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!