Apache的骆驼 - 斯普利特和聚合 - 旧的Exchange总是空 [英] Apache Camel - Split and aggregate - Old Exchange is always null

查看:145
本文介绍了Apache的骆驼 - 斯普利特和聚合 - 旧的Exchange总是空的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我看到,这个问题已经被问了很多次,但没有后帮或有一个结论性的解决方案。我分裂的消息,然后使用Aggregator2汇总吧。在code被扔异常,因为oldExchange总是空。因此,为了测试我设计了一个小code。

我读了订单,xml文件看起来像这样

 <订单的xmlns =HTTP://有的/模式/订单>
    <排序>
            &所述; ORDERNUM→1&下; / ORDERNUM>
    < /排序>
    <排序>
            &所述; ORDERNUM→2&下; / ORDERNUM>
    < /排序>
    <排序>
            &所述; ORDERNUM→3&下; / ORDERNUM>
    < /排序>
    <排序>
            &所述; ORDERNUM> 5℃/ ORDERNUM>
    < /排序>
    <排序>
            &所述; ORDERNUM→6&下;​​ / ORDERNUM>
    < /排序>

我的骆驼上下文貌似这个

 <骆驼:路线>
<骆驼:文件:源/数据/ catask /测试空操作=真正的从URI = />
<骆驼:日志消息=$ {}身体>< /骆驼:登录>
<骆驼:分流>
<骆驼:XPath的> // TE:订单/ *< /骆驼:XPath的>
<骆驼:到URI =直接:logQueries/>
<骆驼:到URI =直接:aggegateQueries/>
< /骆驼:分流>< /骆驼:路线><骆驼:路线>
<骆驼:从URI =直接:logQueries/>
<骆驼:日志消息=的号召后:\\ n $ {}身体>< /骆驼:登录>
< /骆驼:路线> <骆驼:路线>
<骆驼:从URI =直接:aggegateQueries/>
<骆驼:总strategyRef =aggrTaskcompletionInterval =8000>
<骆驼:correlationEx pression>
<骆驼:XPath的> // TE:订单< /骆驼:XPath的>
< /骆驼:correlationEx pression>
<骆驼:文件:源/数据/ catask /输出文件名=的Output.xml到URI = />< /骆驼:骨料>
< /骆驼:路线>

我的聚合策略类看起来像这样

 公开交易总量(交易所oldExchange,交易所newExchange){
            如果(oldExchange == NULL){
            的System.out.println(回归新交易所);
                返回newExchange;
            }            字符串oldBody = oldExchange.getIn()getBody(为String.class)。
            字符串newBody = newExchange.getIn()getBody(为String.class)。
            oldExchange.getIn()setBody(oldBody +++ newBody)。
            返回oldExchange;
        }

的问题是,当聚合结果保存在output.xml中文件仅包含从orders.xml中读出的最后一个记录。

即。

 <订购的xmlns =HTTP://有的/模式/订单>
            &所述; ORDERNUM→6&下;​​ / ORDERNUM>
    < /排序>

我看着它进一步发现,这是第一次通话后,由于发生oldExchange应该有一定的价值,但事实证明它总是空。我认为,因为它是阅读所有从单个文件和分裂它,只有交流。

>任何建议??

更新1:每克劳斯我可以用分路器只是为了解决这个问题。我这样做,并能成功地加入所有的消息。但是我仍然在寻找一种方式来使用Aggregator2。在这里,我做到了用分离器只。

 骆驼:路线>
<骆驼:文件:源/数据/ catask /测试空操作=真正的从URI = />
<骆驼:日志消息=$ {}身体>< /骆驼:登录>
<骆驼:拆分strategyRef =aggrTask>
<骆驼:XPath的> // TE:订单/ *< /骆驼:XPath的>
<骆驼:到URI =直接:logQueries/>
 <
< /骆驼:分流>< /骆驼:路线><骆驼:路线>
<骆驼:从URI =直接:logQueries/>
<骆驼:日志消息=的号召后:\\ n $ {}身体>< /骆驼:登录>
< /骆驼:路线>


解决方案

我想我想通了如何使用聚合汇总的消息。我添加了一个headerName名为id,并用它作为我的相关ID。

 <骆驼:路线>
 <骆驼:文件:源/数据/ catask /测试空操作=真正的从URI = />
<骆驼:日志消息=$ {}身体>< /骆驼:登录>
<骆驼:分流>
<骆驼:XPath的> // TE:订单/ *< /骆驼:XPath的>
<骆驼:到URI =直接:addHeaders/>
 <骆驼:到URI =直接:aggegateQueries/>
< /骆驼:分流>< /骆驼:路线><骆驼:路线>
<骆驼:从URI =直接:addHeaders/>
<骆驼:headerName的setHeader =ID>
<骆驼:常数GT;为了< /骆驼:常数GT;
< /骆驼:&的setHeader GT;< /骆驼:路线>  <骆驼:路线>
<骆驼:从URI =直接:aggegateQueries/>
<骆驼:总strategyRef =aggrTaskcompletionInterval =8000>
<骆驼:correlationEx pression>
<简单且GT; header.id< /简单>
< /骆驼:correlationEx pression>
<骆驼:文件:源/数据/ catask /输出文件名=的Output.xml到URI = />
<骆驼:日志消息=已合并:: / N $ {}身体>< /骆驼:登录>
< /骆驼:骨料>
< /骆驼:路线>

此聚集我的信息。不过,我现在还不能确定,尽管使用正确的XPATH为什么骆驼认为这是不同类型的消息?

骆驼论坛复印CLAUS的解释:
看起来像它的前相关pression这是一个新的组
每个消息,如每个XPath的结果是不同的。
如果要分割,并加入相同的消息,然后看到这个EIP
http://camel.apache.org/composed-message-processor.html
而看到只使用分路器的例子。那是很容易做的。

我测试使用XPath计算器工具中的XPath前pression也打印出来coorelation前pression结果和我的//订单的所有消息都是一样的。 EX-

  1组:
<排序>
 &所述; ORDERNUM→1&下; / ORDERNUM>
< /排序> 第2组:
 <排序>
  &所述; ORDERNUM→2&下; / ORDERNUM>
 < /排序>

I see that this question has been asked a number of times but none of post helped or had a conclusive solution. I am splitting a message and then aggregating it using Aggregator2. The code was throwing exception because oldExchange was always null. So to test I designed a small code.

I read an orders,xml file which looks like this

<Orders xmlns="http://some/schema/Order">
    <Order>
            <orderNum>1</orderNum>
    </Order>
    <Order>
            <orderNum>2</orderNum>
    </Order>
    <Order>
            <orderNum>3</orderNum>
    </Order>
    <Order>
            <orderNum>5</orderNum>
    </Order>
    <Order>
            <orderNum>6</orderNum>
    </Order>

My camel Context Looks like this

<camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split>
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
<camel:to uri="direct:aggegateQueries"/>  
</camel:split>

</camel:route>

<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call : \n ${body}"></camel:log>  
</camel:route>

 <camel:route>
<camel:from uri="direct:aggegateQueries"/>
<camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
<camel:correlationExpression>
<camel:xpath>//te:Order</camel:xpath>
</camel:correlationExpression>
<camel:to uri="file:src/data/catask/output?fileName=output.xml"/>  

</camel:aggregate>
</camel:route>  

My Aggregation Strategy class looks like this

   public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
            if (oldExchange == null) { 
            System.out.println("Returning new exchange"); 
                return newExchange; 
            } 

            String oldBody = oldExchange.getIn().getBody(String.class); 
            String newBody = newExchange.getIn().getBody(String.class); 
            oldExchange.getIn().setBody(oldBody + "+" + newBody); 
            return oldExchange; 
        } 

The problem is that when the aggregated result is saved in output.xml file it contains only the last record it read from Orders.xml.

i.e.

<Order xmlns="http://some/schema/Order">
            <orderNum>6</orderNum>
    </Order>

I looked into it further and found that this was happening because after the first call oldExchange should have some value but it turns out it is always null. I think that because it is reading everything from a single file and splitting it, there is only exchange.

> Any suggestions??

UPDATE 1: Per Claus I can use Splitter only to solve this issue. I did that and was able to successfully join all the messages. However I am still looking for a way to use Aggregator2. Here how I did it using Splitter only.

camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split strategyRef="aggrTask"> 
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
 < 
</camel:split>

</camel:route>

<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call : \n ${body}"></camel:log>  
</camel:route>

解决方案

I think I figured it out how to aggregate the messages using Aggregator. I added a headerName called id and use it as my correlation id.

<camel:route>
 <camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split>
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:addHeaders"/>
 <camel:to uri="direct:aggegateQueries"/>
</camel:split>

</camel:route>

<camel:route>
<camel:from uri="direct:addHeaders"/>
<camel:setHeader headerName="id">
<camel:constant>order</camel:constant>
</camel:setHeader>

</camel:route>

  <camel:route>
<camel:from uri="direct:aggegateQueries"/>
<camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
<camel:correlationExpression>
<simple>header.id</simple>
</camel:correlationExpression>
<camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
<camel:log message="MERGED:: /n ${body}"></camel:log>
</camel:aggregate>
</camel:route>  

This aggregates my messages. However I am still not sure that despite using correct XPATH why does Camel thinks that it is different type of message?

COPYING CLAUS's explanation from camel forums: "Looks like its your correlation expression that is a new group for each message, eg each xpath result is different. If you want to split and join the same messages then see this eip http://camel.apache.org/composed-message-processor.html And see the example using only splitter. That is much easier to do. "

I tested the Xpath expression using a Xpath Evaluator tool and also printed out the coorelation expression results and all my messages with //Order are same. Ex-

Group 1: 
<Order>  
 <orderNum>1</orderNum>  
</Order>  

 Group 2: 
 <Order>  
  <orderNum>2</orderNum>  
 </Order> 

这篇关于Apache的骆驼 - 斯普利特和聚合 - 旧的Exchange总是空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆