骆驼聚合策略 [英] Camel aggregation strategy

查看:159
本文介绍了骆驼聚合策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我解析一个CSV文件,其分裂和骆驼路由,通过多个处理器。有两个端点,有错误的数据之一,而其他已经验证的数据。

I am parsing a CSV file, splitting it and routing it through multiple processors in camel. There are two endpoints , one having erroneous data while other has validated data.

我需要建议的汇总数据。

I need suggestion in aggregating the data.

假设CSV文件有10条记录了其中6个达到了一个端点,而达到4到另一个。我怎么能知道,如果所有10已经从每个端点文件完成,前进聚合器。
我需要创建两个文件中的一个有效数据和其他与单个文件损坏的数据。

Let's say the CSV file has 10 records out of which 6 reached one endpoint while 4 reached to another. How can I know if all 10 has completed from the file at each endpoint and move ahead of aggregator. I need to create two files one with valid data and other with corrupt data from a single file.

推荐答案

让我们看看分离器的回报是什么。

Lets look at what the splitter returns.

据骆驼2.2的文档。以上的分离器将默认使用你的榜样返回最后一个分裂的消息这很可能是最后一行来完成它的处理器使10号线(使用你的例子),它可能不是。

According to the documentation on Camel 2.2. or older the splitter will by default return the last split message using your example this would probably be the last line to complete its processor so it might not be line 10 (using your example).

在骆驼2.3和较新的分离器将默认返回原始的输入信息,即所有10条线。这是默认的行为,你不需要code什么这个工作。当分路器默认情况下,完成了它会把这个消息传递给下一个终点。

On Camel 2.3 and newer the splitter will by default return the original input message i.e. all 10 lines. This is the default behavior and you dont need to code anything for this to work. When the splitter is finished by default it will pass this message along to the next end point.

所以,如果我是用骆驼2.3或以下DSL新:

So if I was using the following DSL on Camel 2.3 or newer:

<camelContext trace="false" id="blueprintContext" xmlns="http://camel.apache.org/schema/blueprint">
<route id="splitExample">
    <from uri="timer://myTimer?period=2000"/>
    <setBody>
        <simple>A\nB\nC</simple>

    </setBody>

    <log message="The message body before the splitter contains ${body}"/>
    <split>
        <tokenize token="\n"></tokenize>

        <log message="Split line ${body}"/>
    </split>
    <log message="The message body after the splitter contains ${body}"/>
</route>
</camelContext>  

下面将出现在日志中:

The following would appear in the log:

 INFO  The message body before the splitter contains 
       A
       B
       C
 INFO  Split line A
 INFO  Split line B
 INFO  Split line C
 INFO  The message body after the splitter contains 
       A
       B
       C

正如你所看到的骆驼默认情况下,结合信息返回到一个分离器返回后。要覆盖此行为,你需要实现自己的聚合。要做到这一点创建一个类可以称之为 MyAggregationStrategy ,使类实现 AggregationStrategy 。我使用 rel=\"nofollow\">的例子。例如,我们将总计流入出价和希望聚合的出价最高。

As you can see camel by default combines the messages back into one after the splitter returns. To override this behavior you would need to implement your own aggregator. To do so create a class lets call it MyAggregationStrategy and make the class implement AggregationStrategy. I used the example in the apache documentation from here. example we will aggregate incoming bids and want to aggregate the highest bid.

private static class MyAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
    {
        if (oldExchange == null) 
        { 
           // the first time we only have the new exchange so it wins the first round
           return newExchange;
        }
        int oldPrice = oldExchange.getIn().getBody(Integer.class);
        int newPrice = newExchange.getIn().getBody(Integer.class);
        // return the "winner" that has the highest price
        return newPrice > oldPrice ? newExchange : oldExchange;
    }
}

您已经做了之后,你再告诉分离器通过执行以下操作使用您的聚合:

After you have done this you then tell the splitter to use your aggregator by doing the following:

春/ XML DSL:

Spring/XML DSL:

<split  strategyRef="MyAggregationStrategy ">

在Java的:

from("direct:start")
// aggregated by header id and use our own strategy how to aggregate
.aggregate(new MyAggregationStrategy())

希望这给了你分路器是如何工作的足够的洞察力。你的情况我可能会为每个行指示标头值,如果它是成功还是失败的话,我会用我的客户聚合失败的和成功分为两个列表作为邮件正文以创建一个新的消息。一个列表中的失败,一个列表与已完成的行项目。

Hopefully this gives you enough insight about how the splitter works. In your case I would probably set a header value for each line indicating if it was successful or failed then I would use my customer aggregator to create a new message with failed and success grouped into two lists as the message body. One list with the failed and one list with the completed line items.

此新聚集消息然后可被发送到一个处理器或另一个端点用于进一步处理。例如,您可以再取故障列表,发送到其产生的文件的路径。在 SEDA 组件可以帮助很多在这里。

This new aggregated message can then be sent to a processor or another endpoint for further processing. For example you can then take the failed list and send that to a route which produces a file. The seda component can help a lot here.

这篇关于骆驼聚合策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆