状态过期时触发Flink [英] Flink Trigger when State expires
问题描述
我有一个有趣的用例,我想用Flink进行测试.我有一个Message
的传入流,它是PASS
或FAIL
.现在,如果消息的类型为FAIL
,则有一个下游ProcessFunction
,它保存Message
状态,然后将pause
命令发送给所有与此相关的命令.当我收到与先前收到的FAIL
关联的PASS
消息(按消息ID键入)时,我会将resume
命令发送到我之前暂停的所有内容.
I have an interesting use case which I want to test with Flink. I have an incoming stream of Message
which is either PASS
or FAIL
. Now if the message is of type FAIL
, I have a downstream ProcessFunction
which saves the Message
state and then sends pause
commands to everything that depends on this. When I receive a PASS
message which is associated with the FAIL
I had received earlier (keying by message id), I send resume
commands to everything I had paused earlier.
现在,我计划使用状态TTL来使存储的FAIL
状态失效,并在某个超时后恢复所有内容,即使我没有收到具有相同消息ID的PASS
消息也是如此.可以单独使用Flink完成此操作,还是需要一些外部计时器将超时消息发送到我的程序?
Now I plan on using State TTL to expire the stored FAIL
state and resume everything after a certain timeout even if I haven't received a PASS
message with the same message id. Could this be done with Flink alone or would I need to have some external timer to send timeout messages to my program?
我想让它在Flink中工作是这样的:
I had something like this in mind to get it working in Flink:
对于每个Message
,添加时间戳并将其传递给过程函数,该过程将等到current_ts - timestamp == timeout
之后再发送,以恢复模块暂停的所有内容.有没有更好的方法,或者你们认为这还好吗?
For each Message
, add timestamp and pass it on to a process function which waits until current_ts - timestamp == timeout
before sending it on to resume everything paused by the module. Is there a better way or do you guys think this is ok?
推荐答案
似乎使用计时器使状态过期(通过在onTimer方法中调用state.clear())更简单,而不是使用state TTL.相同的onTimer方法还可以安排事物同时恢复.
Seems like it would be more straightforward to use a timer to expire the state (by calling state.clear() in the onTimer method), rather than using state TTL. The same onTimer method can also arrange for things to resume at the same time.
这篇关于状态过期时触发Flink的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!