如何聚合一个消息与骆驼聚集多个组? [英] How to aggregate one message into multiple group with camel aggregate?

查看:128
本文介绍了如何聚合一个消息与骆驼聚集多个组?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想产生连续的市场数据的聚合视图,这意味着我们需要计算总和值每2消息。说来的数据作为:

I'm trying to generate a aggregate view of consecutive market data, which means we need to calculate the sum value every 2 message. say the data coming in as:

(V0,T0),(V1,T1),(V2,T2),(V3,T3)....

V 表示值 T 表示时间戳当我们收到的数据。

V means value T means timestamp when we receive the data.

我们需要为每台2分的总和说:

We need to generate the sum for every 2 points say:

(R1=Sum(V0,V1),T1),(R2=Sum(V1,V2),T2),(R3=Sum(V2,V3),T3),....

任何建议,我们如何能做到这一点,通过使用 aggregator2 或我们需要写这个处理器?

Any suggestion how can we do this by using aggregator2 or we need to write a processor for this?

推荐答案

阅读聚合源$ C ​​$ C后,事实证明,骆驼只有骨料一个消息给一组,我们必须建立一个聚合的此目的。这里是code:

After reading the source code of Aggregator, it turns out that camel only aggregate one message to one group, we have to build a "aggregator" for this purpose. here is the code:

public abstract class GroupingGenerator<I> implements Processor {
private final EvictingQueue<I> queue;
private final int size;

public int getSize() {
    return size;
}

public GroupingGenerator(int size) {
    super();
    this.size = size;
    this.queue = EvictingQueue.create(size);
}

@SuppressWarnings("unchecked")
@Override
public void process(Exchange exchange) throws Exception {
    queue.offer((I) exchange.getIn().getBody());
    if (queue.size() != size) {
        exchange.setProperty(Exchange.ROUTE_STOP, true);
        return;
    } else {
        processGroup(queue, exchange);
    }
}

protected abstract void processGroup(Collection<I> items, Exchange exchange);

}

这篇关于如何聚合一个消息与骆驼聚集多个组?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆