使用 MiniCluster 测试 flink 作业以使用处理时间触发计时器 [英] testing flink jobs with MiniCluster to trigger the timer using processing time

查看:18
本文介绍了使用 MiniCluster 测试 flink 作业以使用处理时间触发计时器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在使用MiniClusterWithClientResource测试flink作业时,有没有办法控制触发定时器的处理时间?

is there any way to control the processing time to trigger the timer when testing flink jobs with MiniClusterWithClientResource?

我能够在单元测试中使用testharness<来测试KeyedCoProcessFunction的两种方法,即processElement()...触发计时器回调即onTimer().../strong> 并控制处理时间,即:

I'm able to test both the methods of the KeyedCoProcessFunction i.e. processElement()... triggering timer callback i.e onTimer()... in the Unit Tests using a testharness and controlling the processing time i.e:

//直接提前算子的处理时间触发处理时间定时器testHarness.setProcessingTime(300000)

//trigger processing time timer by advancing the processing time of the operator directly testHarness.setProcessingTime(300000)

这样.我可以在指定时间触发定时器

Thus. I can trigger the timer at the specified time

但是,我现在需要的是使用 miniCluster MiniClusterWithClientResource

however, what I need now is trigger the timer in a end to end flink job test using minicluster MiniClusterWithClientResource

val flinkCluster = 新的 MiniClusterWithClientResource...并且能够提前处理时间来触发 onTimer 方法

val flinkCluster = new MiniClusterWithClientResource... and be able to advance the processing time to fire the onTimer method

推荐答案

在发送完所有消息后在 SourceFunction 类中添加 Thread.sleep(1000) 一秒钟解决了问题.

adding a Thread.sleep(1000) one second in the SourceFunction class after all messages are sent solved the problem.

class MySourceFunction() extends RichParallelSourceFunction[]{
...

//is a one-time Delay after all messages have been sent
Thread.sleep(1000)
}

这篇关于使用 MiniCluster 测试 flink 作业以使用处理时间触发计时器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆