是否可以在 apache flink CEP 中处理多个流? [英] Is it possible to process multiple streams in apache flink CEP?

查看:28
本文介绍了是否可以在 apache flink CEP 中处理多个流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的问题是,如果我们有两个原始事件流,即 SmokeTemperature,并且我们想找出复杂事件,即 Fire> 将操作符应用于原始流,我们可以在 Flink 中做到这一点吗?

My Question is that, if we have two raw event streams i.e Smoke and Temperature and we want to find out if complex event i.e Fire has happened by applying operators to raw streams, can we do this in Flink?

我问这个问题是因为到目前为止我看到的所有 Flink CEP 示例都只包含一个输入流.如果我错了,请纠正我.

I am asking this question because all the examples that I have seen till now for Flink CEP include only one input stream. Please correct me if I am wrong.

推荐答案

简短回答 - 是的,您可以根据来自不同流源的事件类型读取和处理多个流和触发规则.

Short Answer - Yes, you can read and process multiple streams and fire rules based on your event types from the different stream source.

长答案 - 我有一个有点相似的要求,我的答案是基于这样一个假设,即您正在阅读来自不同 kafka 主题的不同流.

Long answer - I had a somewhat similar requirement and My answer is based on the assumption that you are reading different streams from different kafka topics.

从在单个源中流式传输不同事件的不同主题读取:

Read from different topics which stream different events in a single source:

FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
        new StringSerializerToEvent(),
        props);

kafkaSource.assignTimestampsAndWatermarks(new 
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
        .filter(Objects::nonNull);

序列化器读取数据并将它们解析为具有通用格式的数据 - 例如.

The serializer reads the data and parses them to a have a common format - For eg.

@Data
public class BAMEvent {
 private String keyid;  //If key based partitioning is needed
 private String eventName; // For different types of events
 private String eventId;  // Any other field you need
 private long timestamp; // For event time based processing 

 public String toString(){
   return eventName + " " + timestamp + " " + eventId + " " + correlationID;
 }

}

之后,事情就很简单了,根据事件名称定义规则并比较事件名称来定义规则(您也可以如下定义复杂规则):

and after this, things are pretty straightforward, define the rules based on the event name and compare the event name for defining the rules (You can also define complex rules as follows) :

Pattern.<BAMEvent>begin("first")
        .where(new SimpleCondition<BAMEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(BAMEvent event) throws Exception {
            return event.getEventName().equals("event1");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<BAMEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {

            if (!secondEvent.getEventName().equals("event2")) {
              return false;
            }

            for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getEventId = firstEvent.getEventId()) {
                return true;
              }
            }
            return false;
          }
        })
        .within(withinTimeRule);

我希望这能让您想到将一个或多个不同的流整合在一起.

I hope this gives you the idea to integrate one or more different streams together.

这篇关于是否可以在 apache flink CEP 中处理多个流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆