Flink 自定义触发器给出意外输出 [英] Flink Custom Trigger giving Unexpected Output
问题描述
我想创建一个 Trigger
,它第一次在 20 秒内触发,之后每五秒触发一次.我使用了 GlobalWindows
和一个自定义的 Trigger
I want to create a Trigger
which gets fired in 20 seconds for the first time and in every five seconds after that. I have used GlobalWindows
and a custom Trigger
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
这是TradeTrigger
中的代码:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
} else {
if((System.currentTimeMillis()-ctime) >= 5000){
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(
long arg0,
W arg1,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg2)
throws Exception {
// TODO Auto-generated method stub
return TriggerResult.CONTINUE;
}
public static <W extends Window> TradeTrigger<W> of() {
return new TradeTrigger<>();
}
}
所以基本上,当 flag
为 false
时,即第一次,Trigger
应该在 20 秒内触发并设置 flag
到 true
.从下一次开始,它应该每 5 秒触发一次.
So basically, when flag
is false
, i.e. the first time, the Trigger
should get fired in 20 seconds and set the flag
to true
. From the next time, it should get fired every 5 seconds.
我面临的问题是,每次触发 Trigger
时,我只在输出中收到一条消息.也就是说,我在 20 秒后收到一条消息,每五秒收到一条消息.我期望每次触发时输出中有 20 条消息.
The problem I am facing is, I am getting only one message in the output every time the Trigger
is fired. That is, I get a single message after 20 seconds and single messages in every five seconds.
I am expecting twenty messages in the output on each triggering.
如果我使用 .timeWindow(Time.seconds(5))
并创建一个 5 秒的时间窗口,我会每 5 秒输出 20 条消息.请帮助我正确获取此代码.有什么我遗漏的吗?
If I use .timeWindow(Time.seconds(5))
and create a time window of five seconds, I get 20 messages in output every 5 seconds.
Please help me get this code right. Is there something I am missing?
推荐答案
在来自 Fabian 和 Flink 邮件列表的答案的帮助下得到了解决.通过 TriggerContext
将状态存储在 ValueState
变量中.检查onEvent()
方法中的变量,如果是第一次,注册一个processingTimeTimer
比当前时间多20秒并更新状态.在 onProcessingTime
方法中,注册另一个 ProcessingTimeTimer
比当前时间多 5 秒,更新状态并触发 Window
.
Got it working with the help of the answer from Fabian and Flink mailing lists.
Stored the state in a ValueState
variable through the TriggerContext
. Checked the variable in onEvent()
method and if it was the first time, registered a processingTimeTimer
for 20 seconds more than the current time and updated the state. In the onProcessingTime
method, registered another ProcessingTimeTimer
for 5 seconds more than current time, updated the state and fired the Window
.
这篇关于Flink 自定义触发器给出意外输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!