Siddhi CEP:在批处理时间窗口中聚合分组事件的字符串值 [英] Siddhi CEP: Aggregate the String values of grouped events in a batch time window

查看:497
本文介绍了Siddhi CEP:在批处理时间窗口中聚合分组事件的字符串值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Siddhi来减少系统中现有的事件数量。为此,我声明了一个批处理时间窗口,该窗口根据事件的target_ip将所有事件分组。

I'm using Siddhi to reduce the amount of events existing in a system. To do so, I declared a batch time window, that groupes all the events based on their target_ip.

from Events#window.timeBatch(30 sec)
select id as meta_ID, Target_IP4 as target_ip
group by Target_IP4
insert into temp;

我想要的结果是每个target_ip和meta_ID参数值作为一个事件构成事件的不同事件的串联。

The result I would like to have is a single event for each target_ip and the meta_ID parameter value as the concatenation of the distinct events that forms the event.

问题是前一个查询生成的事件与不同的meta_ID值一样多。例如,我正在

The problem is that the previous query generates as many events as distinct meta_ID values. for example, I'm getting


  1. id_10, target_1

  2. id_11 , target_1

我想拥有


  1. id_10,id_11, target_1

我知道查询中缺少某些汇总方法,我在Siddhi中看到了很多聚合函数,包括具有方法str:concat的siddhi-execution-string扩展,但是我不知道如何使用它来聚合meta_ID值。有想法吗?

I'm aware that some aggregation method is missing in my query, I saw a lot of aggregation function in Sidd including the siddhi-execution-string extension which has the method str:concat, but I don't know how to use it to aggregate the meta_ID values. Any idea?

推荐答案

您可以编写如下所示的执行计划来实现您的要求:

You could write an execution plan as shown below, to achieve your requirement:

define stream inputStream (id string, target string);

-- Query 1
from inputStream#window.timeBatch(30 sec)
select *
insert into temp;

-- Query 2
from temp#custom:aggregator(id, target) 
select *
insert into reducedStream;

在这里,custom:aggregator是您将要实现的自定义流处理器扩展。您可以在实现它时遵循[1]。

Here, the custom:aggregator is the custom stream processor extension that you will have to implement. You can follow [1] when implementing it.

让我解释一下工作原理:

Let me explain a bit about how things work:

查询1每30秒生成一批事件。换句话说,我们使用查询1创建一批事件。

Query 1 generates a batch of events every 30 seconds. In other words, we use Query 1 for creating a batch of events.

因此,在每30秒间隔结束时,这批事件将被馈送到custom:聚合​​流处理器。当接收到流处理器的输入时,将按下其process()方法。

So, at the end of every 30 second interval, the batch of events will be fed into the custom:aggregator stream processor. When an input is received to the stream processor, its process() method will be hit.

@Override
    protected void process(ComplexEventChunk<StreamEvent> streamEventChunk, Processor nextProcessor, StreamEventCloner streamEventCloner, ComplexEventPopulater complexEventPopulater) {
        //implement the aggregation & grouping logic here
}

streamEventChunk中有事件的批处理。在实现process()方法时,可以遍历streamEventChunk并为每个目标创建一个事件。您将需要在process()方法中实现此逻辑。

The batch of events is there in the streamEventChunk. When implementing the process() method, you can iterate over the streamEventChunk and create one event per each destination. You will need to implement this logic in the process() method.

[1] https://docs.wso2.com/display/CEP420/Writing+a+Custom+Stream+Processor+Extension

这篇关于Siddhi CEP:在批处理时间窗口中聚合分组事件的字符串值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆