如何实现在 X 分钟内未收到任何事件后发出的 Flink 事件时间触发器 [英] How to implement a Flink Event Time Trigger that emits after no events recieved for X minutes

查看:24
本文介绍了如何实现在 X 分钟内未收到任何事件后发出的 Flink 事件时间触发器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有点难以理解 Flink 触发器的工作原理.我的数据流包含具有 sessionId 的事件,我根据该 sessionId 聚合了这些事件.每个会话将包含一个 Started 和一个 Ended 事件,但有时 Ended 事件会丢失.

I'm struggling a bit understanding how Flink Triggers work. My datastream contains events with a sessionId that I aggregated based on that sessionId. Each session will contain a Started and a Ended event however some times the Ended event will be lost.

为了处理这个问题,我设置了一个触发器,它会在处理结束事件时发出聚合会话.但是,如果该会话在 2 分钟内没有事件到达,我想发出我们迄今为止聚合的任何内容(我们发送事件的应用程序每分钟发送一次心跳,因此如果我们没有收到任何事件,会话将被视为丢失).

In order to handle this I've set up a Trigger that will emit the aggregated session whenever the ended event is processed. But in the case that no events arrive from that session for 2 minutes I want to emit whatever we have aggregated so far (our apps that send the events send heartbeats every minute so if we don't get any events the session is considered lost).

我设置了以下触发功能:

I've set up the following trigger function:

public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
    private final long sessionTimeout;
    private long lastSetTimer;

    // Max session length set to 1 day
    public static final long MAX_SESSION_LENGTH = 1000l * 86400l;

    // End session events
    private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
            .add("Playback.Aborted")
            .add("Playback.Completed")
            .add("Playback.Error")
            .add("Playback.StartAirplay")
            .add("Playback.StartCasting")
            .build();

    public EventTimeProcessingTimeTrigger(long sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    @Override
    public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
        ctx.registerProcessingTimeTimer(lastSetTimer);

        if(endSession.contains(element.get(Field.EVENT_TYPE))) {
            return TriggerResult.FIRE_AND_PURGE;
        }

        return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return time == window.maxTimestamp() ?
                TriggerResult.FIRE_AND_PURGE :
                TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteProcessingTimeTimer(lastSetTimer);
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window,
                        OnMergeContext ctx) {
        ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
    }
}

为了为事件设置水印,我使用应用程序设置的水印,因为 appEventTime 可能与服务器上的 wallClock 不同.我像这样提取水印:

In order to set watermarks for the events I use the watermarks set by the apps since appEventTime might not be the same as wallClock on the server. I extract watermarks like this:

DataStream<HashMap> playerEvents = env
                .addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
                .name("Read player events from Kafka")
                .uid("Read player events from Kafka")
                .map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
                .name("Map Json to HashMap")
                .uid("Map Json to HashMap")
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
                {
                    @Override
                    public long extractTimestamp(HashMap element)
                    {
                        long timestamp = 0L;
                        Object timestampAsObject = (Object) element.get("CanonicalTime");
                        timestamp = (long)timestampAsObject;
                        return timestamp;
                    }
                })
                .name("Add CanonicalTime as timestamp")
                .uid("Add CanonicalTime as timestamp");

现在我觉得奇怪的是,当我在调试中运行代码并在触发器的清除函数中设置断点时,它不断被调用.即使在触发器中没有达到 FIRE_AND_PURGE 点.所以感觉就像我完全误解了触发器应该如何工作.而且我的实现根本没有做我认为它在做的事情.

Now what I find strange is that when I run the code in debug and set a breakpoint in the clear function of the Trigger it constantly gets called. Even when no FIRE_AND_PURGE point is reached in the Trigger. So it feels like I've completely misunderstood how the Trigger is supposed to work. And that my implementation is not at all doing what I think it's doing.

我想我的问题是,触发器何时应该调用 clear?这是实现组合 EventTimeTrigger 和 ProcessingTimeTrigger 的正确方法吗?

I guess my question is, when should clear be called by the Trigger? And is this the correct way to implement a combined EventTimeTrigger and ProcessingTimeTrigger?

感谢所有我能得到的帮助.

Thankful for all the help I can get.

更新 1: (2020-05-29)

UPDATE 1: (2020-05-29)

为了提供有关如何设置的更多信息.我的环境设置如下:

In order to provide some more information about how things are setup. I set up my environment as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

所以我对整个流使用 EventTime.然后我像这样创建窗口:

So I use EventTime for the entire stream. I then create the windows like this:

DataStream<PlayerSession> playerSessions = sideEvents
                .keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
                .window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
                .trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
                .aggregate(new SessionAggregator())
                .name("Aggregate events into sessions")
                .uid("Aggregate events into sessions");

推荐答案

这种情况很复杂.我不敢准确预测这段代码会做什么,但我可以解释一些正在发生的事情.

This situation is complex. I hesitate to predict exactly what this code will do, but I can explain some of what’s going on.

第 1 点:您已将时间特征设置为事件时间,安排时间戳和水印,并在您的触发器中实现了 onEventTime 回调.但是您无处可创建事件时间计时器.除非我错过了什么,否则实际上没有使用事件时间或水印.您还没有实现事件时间触发器,我不希望 onEventTime 会被调用.

Point 1: you have set the time characteristic to event time, arranged for timestamps and watermarks, and implemented an onEventTime callback in your Trigger. But nowhere are you creating an event time timer. Unless I've missed something, nothing is actually using event time or watermarks. You haven't implemented an event time trigger, and I would not expect that onEventTime will ever be called.

第 2 点:您的触发器不需要调用 clear.作为清除窗口的一部分,Flink 负责在触发器上调用 clear.

Point 2: Your trigger doesn't need to call clear. Flink takes care of calling clear on triggers as part of purging windows.

第 3 点:您的触发器试图反复触发和清除窗口,这似乎不正确.我这样说是因为您正在为每个元素创建一个新的处理时间计时器,并且当每个计时器触发时,您正在触发和清除窗口.您可以随心所欲地触发该窗口,但您只能清除该窗口一次,之后它就会消失.

Point 3: Your trigger is trying to fire and purge the window repeatedly, which doesn't seem right. I say this because you are creating a new processing time timer for every element, and when each timer fires, you are firing and purging the window. You can fire the window as often as you like, but you can only purge the window once, after which it is gone.

第 4 点:会话窗口是一种特殊的窗口,称为合并窗口.当会话合并时(这种情况一直发生,随着事件的到来),它们的触发器被合并,其中一个被清除.这就是为什么你会看到 clear 如此频繁地被调用.

Point 4: Session windows are a special kind of window, known as merging windows. When sessions merge (which happens all the time, as events arrive), their triggers are merged, and one of them gets cleared. This is why you see clear being called so frequently.

建议:由于您有每分钟一次的 keepalive,并且打算在 2 分钟不活动后关闭会话,似乎您可以将会话间隔设置为 2 分钟,这样可以避免一些正在发生的事情事情这么复杂.让会话窗口执行其设计的任务.

Suggestion: since you have once-a-minute keepalives, and intend to close sessions after 2 minutes of inactivity, it seems like you could set the session gap to be 2 minutes, and that would avoid a fair bit of what's making things so complex. Let the session windows do what they're designed to do.

假设这可行,那么您可以简单地扩展 Flink 的 ProcessingTimeTrigger 并覆盖其 onElement 方法来执行此操作:

Assuming that will work, then you could simple extend Flink's ProcessingTimeTrigger and override its onElement method to do this:

@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

    if (endSession.contains(element.get(Field.EVENT_TYPE))) {
        return TriggerResult.FIRE_AND_PURGE;
    }

    return super(element, timestamp, window, ctx);
}

以这种方式,窗口将在两分钟不活动后或由显式会话结束事件触发.

In this fashion the window will be triggered after two minutes of inactivity, or by an explicit session-ending event.

您应该能够简单地继承 ProcessingTimeTrigger 的其余行为.

You should be able to simply inherit the rest of ProcessingTimeTrigger's behavior.

如果您想使用事件时间,则使用 EventTimeTrigger 作为超类,您必须找到一种方法来确保即使在流空闲时您的水印也能取得进展.请参阅此答案了解如何处理.

If you want to use event time, then use EventTimeTrigger as the superclass, and you'll have to find a way to make sure that your watermarks make progress even when the stream becomes idle. See this answer for how to handle that.

这篇关于如何实现在 X 分钟内未收到任何事件后发出的 Flink 事件时间触发器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆