Apache Flink CEP如何检测事件是否在x秒内未发生? [英] Apache Flink CEP how to detect if event did not occur within x seconds?

查看:129
本文介绍了Apache Flink CEP如何检测事件是否在x秒内未发生?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

例如,A应该在10秒之内紧跟B.我知道如何跟踪此DID是否发生(.next,.within),但是如果B从未在窗口内发生,我想发送警报.

For example A should be followed by B within 10 seconds. I know how to track if this DID occur (.next, .within), but I want to send an alert if B never happened within the window.

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // checkpointing is required for exactly-once or at-least-once guarantees
//      env.enableCheckpointing(1000);

        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
            .setHost("localhost")
            .setPort(5672)
            .setVirtualHost("/")
            .setUserName("guest")
            .setPassword("guest")
            .build();

        final DataStream<String> inputStream = env
            .addSource(new RMQSource<String>(
                connectionConfig,               // config for the RabbitMQ connection
                "cep",                          // name of the RabbitMQ queue to consume
                true,                           // use correlation ids; can be false if only at-least-once is required
                new SimpleStringSchema()))      // deserialization schema to turn messages into Java objects
            .setParallelism(1);                 // non-parallel source is only required for exactly-once

        inputStream.print();

        Pattern<String, ?> simplePattern =
                Pattern.<String>begin("start")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String event) {
                            return event.equals("A");
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String event) {
                            return event.equals("B");
                        }
                    });

        PatternStream<String> timedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));
        OutputTag<String> timedout = new OutputTag<String>("timedout"){};
        SingleOutputStreamOperator<String> timedOutNotificationsStream = timedOutPatternStream.flatSelect(
            timedout,
            new TimedOut<String>(),
            new FlatSelectNothing<String>()
        );
        timedOutNotificationsStream.getSideOutput(timedout).print();

        env.execute("mynotification");
    }

public static class TimedOut<String> implements PatternFlatTimeoutFunction<String, String> {
    @Override
    public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
        out.collect((String) "LATE!");
    }
}

public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
    @Override
    public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {}
}

实际行为:

publish "A"
(wait 5 seconds)
publish "B"
=> (no alert)

publish "A"
(wait 10 seconds)
=> (no alert, but should be)

publish "A"
(wait 10 seconds)
publish "B"
=> "LATE!"

预期行为:

publish "A"
(wait 10 seconds)
=> "LATE!"

推荐答案

您可以通过超时模式进行操作.您可以指定类似的模式 A在10秒之内紧随B之后,并检查超时的模式,这意味着只有A.您可以检查文档中是否存在超时模式

You can do it via timed out patterns. You can specify pattern like A followedBy B within 10 seconds and check for patterns that timed out, which means that there were only A's. You can check docs for timed out patterns here

有关完整示例,请参考此培训或直接

For a full example you can refer to this training or straight to the solution to the excercise.

现在,仅在传入元素上进行处理时间修剪(flink< 1.5).因此,不幸的是,在超时之后,必须至少有一个事件(如果匹配或不匹配,则不相关)将触发超时.可以使用此jira 门票

Right now (flink <1.5) in processing time pruning is done only on incoming element. Therefore unfortunately after the timeout there must be at least one event(irrelevant if matching or not) that will trigger the timeout. Efforts to improve it can be tracked with this jira ticket

这篇关于Apache Flink CEP如何检测事件是否在x秒内未发生?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆