Apache Flink CEP 如何检测事件是否在 x 秒内没有发生? [英] Apache Flink CEP how to detect if event did not occur within x seconds?

查看:46
本文介绍了Apache Flink CEP 如何检测事件是否在 x 秒内没有发生?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

例如,A 应在 10 秒内跟随 B.我知道如何跟踪此 DID 是否发生(.next、.within),但如果 B 从未发生在窗口内,我想发送警报.

 public static void main(String[] args) 抛出异常 {最终 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//恰好一次或至少一次保证需要检查点//env.enableCheckpointing(1000);final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("本地主机").setPort(5672).setVirtualHost("/").setUserName("客人").setPassword("客人").建造();最终数据流输入流 = 环境.addSource(new RMQSource(connectionConfig,//RabbitMQ 连接的配置"cep",//要消费的 RabbitMQ 队列的名称true,//使用相关 ID;如果只需要至少一次,则可以为假new SimpleStringSchema()))//将消息转换为 Java 对象的反序列化模式.setParallelism(1);//非并行源只需要恰好一次inputStream.print();模式<字符串,?>简单模式 =Pattern.begin("start").where(new SimpleCondition() {@覆盖公共布尔过滤器(字符串事件){返回 event.equals("A");}}).next("结束").where(new SimpleCondition() {@覆盖公共布尔过滤器(字符串事件){返回 event.equals("B");}});PatternStreamtimedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));输出标签<字符串>timedout = new OutputTag("timedout"){};SingleOutputStreamOperatortimedOutNotificationsStream = timedOutPatternStream.flatSelect(时间到,new TimedOut(),new FlatSelectNothing());timedOutNotificationsStream.getSideOutput(timedout).print();env.execute("mynotification");}公共静态类 TimedOut实现 PatternFlatTimeoutFunction{@覆盖public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {out.collect((String) "LATE!");}}公共静态类 FlatSelectNothing实现 PatternFlatSelectFunction;{@覆盖public void flatSelect(Map> pattern, Collectorcollector) {}}

实际行为:

发布A"(等待 5 秒)发布B"=>(没有警报)发布A"(等待 10 秒)=>(没有警报,但应该是)发布A"(等待 10 秒)发布B"=>晚了!"

预期行为:

发布A"(等待 10 秒)=>晚了!"

解决方案

您可以通过超时模式来实现.您可以指定模式,如A 在 10 秒内跟随 B 并检查超时的模式,这意味着只有 A.您可以查看文档以了解超时模式 这里

有关完整示例,您可以参考此training 或直接练习.

<小时>

现在(flink <1.5)在处理时间修剪仅对传入元素进行.因此不幸的是,超时后必须至少有一个事件(无论是否匹配都无关)会触发超时.可以使用此 jira ticket<跟踪改进它的努力/p>

For example A should be followed by B within 10 seconds. I know how to track if this DID occur (.next, .within), but I want to send an alert if B never happened within the window.

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // checkpointing is required for exactly-once or at-least-once guarantees
//      env.enableCheckpointing(1000);

        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
            .setHost("localhost")
            .setPort(5672)
            .setVirtualHost("/")
            .setUserName("guest")
            .setPassword("guest")
            .build();

        final DataStream<String> inputStream = env
            .addSource(new RMQSource<String>(
                connectionConfig,               // config for the RabbitMQ connection
                "cep",                          // name of the RabbitMQ queue to consume
                true,                           // use correlation ids; can be false if only at-least-once is required
                new SimpleStringSchema()))      // deserialization schema to turn messages into Java objects
            .setParallelism(1);                 // non-parallel source is only required for exactly-once

        inputStream.print();

        Pattern<String, ?> simplePattern =
                Pattern.<String>begin("start")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String event) {
                            return event.equals("A");
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String event) {
                            return event.equals("B");
                        }
                    });

        PatternStream<String> timedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));
        OutputTag<String> timedout = new OutputTag<String>("timedout"){};
        SingleOutputStreamOperator<String> timedOutNotificationsStream = timedOutPatternStream.flatSelect(
            timedout,
            new TimedOut<String>(),
            new FlatSelectNothing<String>()
        );
        timedOutNotificationsStream.getSideOutput(timedout).print();

        env.execute("mynotification");
    }

public static class TimedOut<String> implements PatternFlatTimeoutFunction<String, String> {
    @Override
    public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
        out.collect((String) "LATE!");
    }
}

public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
    @Override
    public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {}
}

Actual behavior:

publish "A"
(wait 5 seconds)
publish "B"
=> (no alert)

publish "A"
(wait 10 seconds)
=> (no alert, but should be)

publish "A"
(wait 10 seconds)
publish "B"
=> "LATE!"

Expected behavior:

publish "A"
(wait 10 seconds)
=> "LATE!"

解决方案

You can do it via timed out patterns. You can specify pattern like A followedBy B within 10 seconds and check for patterns that timed out, which means that there were only A's. You can check docs for timed out patterns here

For a full example you can refer to this training or straight to the solution to the excercise.


EDIT: Right now (flink <1.5) in processing time pruning is done only on incoming element. Therefore unfortunately after the timeout there must be at least one event(irrelevant if matching or not) that will trigger the timeout. Efforts to improve it can be tracked with this jira ticket

这篇关于Apache Flink CEP 如何检测事件是否在 x 秒内没有发生?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆