骆驼聚合策略 [英] Camel aggregation strategy

查看:25
本文介绍了骆驼聚合策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在解析一个 CSV 文件,将其拆分并通过骆驼中的多个处理器进行路由.有两个端点,一个具有错误数据,而另一个具有验证数据.

I am parsing a CSV file, splitting it and routing it through multiple processors in camel. There are two endpoints , one having erroneous data while other has validated data.

我需要有关汇总数据的建议.

I need suggestion in aggregating the data.

假设 CSV 文件有 10 条记录,其中 6 条到达一个端点,而 4 条到达另一个端点.我如何知道是否所有 10 个都已从每个端点的文件中完成并领先于聚合器.我需要创建两个文件,一个包含有效数据,另一个包含来自单个文件的损坏数据.

Let's say the CSV file has 10 records out of which 6 reached one endpoint while 4 reached to another. How can I know if all 10 has completed from the file at each endpoint and move ahead of aggregator. I need to create two files one with valid data and other with corrupt data from a single file.

推荐答案

让我们看看拆分器返回什么.

Lets look at what the splitter returns.

根据 Camel 2.2 的文档.或更旧的拆分器将默认使用您的示例返回最后一条拆分消息,这可能是完成其处理器的最后一行,因此它可能不是第 10 行(使用您的示例).

According to the documentation on Camel 2.2. or older the splitter will by default return the last split message using your example this would probably be the last line to complete its processor so it might not be line 10 (using your example).

在 Camel 2.3 和更新版本中,拆分器将默认返回原始输入消息,即所有 10 行.这是默认行为,您无需为此编写任何代码即可工作.默认情况下,当拆分器完成时,它将将此消息传递到下一个端点.

On Camel 2.3 and newer the splitter will by default return the original input message i.e. all 10 lines. This is the default behavior and you dont need to code anything for this to work. When the splitter is finished by default it will pass this message along to the next end point.

因此,如果我在 Camel 2.3 或更高版本上使用以下 DSL:

So if I was using the following DSL on Camel 2.3 or newer:

<camelContext trace="false" id="blueprintContext" xmlns="http://camel.apache.org/schema/blueprint">
<route id="splitExample">
    <from uri="timer://myTimer?period=2000"/>
    <setBody>
        <simple>A\nB\nC</simple>

    </setBody>

    <log message="The message body before the splitter contains ${body}"/>
    <split>
        <tokenize token="\n"></tokenize>

        <log message="Split line ${body}"/>
    </split>
    <log message="The message body after the splitter contains ${body}"/>
</route>
</camelContext>  

日志中会出现以下内容:

The following would appear in the log:

 INFO  The message body before the splitter contains 
       A
       B
       C
 INFO  Split line A
 INFO  Split line B
 INFO  Split line C
 INFO  The message body after the splitter contains 
       A
       B
       C

如您所见,camel 在默认情况下在拆分器返回后将消息合并为一个.要覆盖此行为,您需要实现自己的聚合器.为此,创建一个类,将其命名为 MyAggregationStrategy 并使该类实现 AggregationStrategy.我使用了 here 中的 apache 文档中的示例.例如,我们将汇总传入的出价并希望汇总最高出价.

As you can see camel by default combines the messages back into one after the splitter returns. To override this behavior you would need to implement your own aggregator. To do so create a class lets call it MyAggregationStrategy and make the class implement AggregationStrategy. I used the example in the apache documentation from here. example we will aggregate incoming bids and want to aggregate the highest bid.

private static class MyAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) 
    {
        if (oldExchange == null) 
        { 
           // the first time we only have the new exchange so it wins the first round
           return newExchange;
        }
        int oldPrice = oldExchange.getIn().getBody(Integer.class);
        int newPrice = newExchange.getIn().getBody(Integer.class);
        // return the "winner" that has the highest price
        return newPrice > oldPrice ? newExchange : oldExchange;
    }
}

完成此操作后,您可以通过执行以下操作告诉拆分器使用您的聚合器:

After you have done this you then tell the splitter to use your aggregator by doing the following:

Spring/XML DSL:

Spring/XML DSL:

<split  strategyRef="MyAggregationStrategy ">

在 Java 中:

from("direct:start")
// aggregated by header id and use our own strategy how to aggregate
.aggregate(new MyAggregationStrategy())

希望这能让您对拆分器的工作原理有足够的了解.在您的情况下,我可能会为每一行设置一个标头值,指示它是成功还是失败,然后我将使用我的客户聚合器创建一个新消息,将失败和成功分为两个列表作为消息正文.一份包含失败的列表和一份包含已完成行项目的列表.

Hopefully this gives you enough insight about how the splitter works. In your case I would probably set a header value for each line indicating if it was successful or failed then I would use my customer aggregator to create a new message with failed and success grouped into two lists as the message body. One list with the failed and one list with the completed line items.

然后可以将这个新的聚合消息发送到处理器或另一个端点以进行进一步处理.例如,您可以获取失败的列表并将其发送到生成文件的路由.seda 组件在这里可以提供很多帮助.

This new aggregated message can then be sent to a processor or another endpoint for further processing. For example you can then take the failed list and send that to a route which produces a file. The seda component can help a lot here.

这篇关于骆驼聚合策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆