Apache的骆驼 - 斯普利特和聚合 - 旧的Exchange总是空 [英] Apache Camel - Split and aggregate - Old Exchange is always null
问题描述
我看到,这个问题已经被问了很多次,但没有后帮或有一个结论性的解决方案。我分裂的消息,然后使用Aggregator2汇总吧。在code被扔异常,因为oldExchange总是空。因此,为了测试我设计了一个小code。
我读了订单,xml文件看起来像这样
<订单的xmlns =HTTP://有的/模式/订单>
<排序>
&所述; ORDERNUM→1&下; / ORDERNUM>
< /排序>
<排序>
&所述; ORDERNUM→2&下; / ORDERNUM>
< /排序>
<排序>
&所述; ORDERNUM→3&下; / ORDERNUM>
< /排序>
<排序>
&所述; ORDERNUM> 5℃/ ORDERNUM>
< /排序>
<排序>
&所述; ORDERNUM→6&下; / ORDERNUM>
< /排序>
我的骆驼上下文貌似这个
<骆驼:路线>
<骆驼:文件:源/数据/ catask /测试空操作=真正的从URI = />
<骆驼:日志消息=$ {}身体>< /骆驼:登录>
<骆驼:分流>
<骆驼:XPath的> // TE:订单/ *< /骆驼:XPath的>
<骆驼:到URI =直接:logQueries/>
<骆驼:到URI =直接:aggegateQueries/>
< /骆驼:分流>< /骆驼:路线><骆驼:路线>
<骆驼:从URI =直接:logQueries/>
<骆驼:日志消息=的号召后:\\ n $ {}身体>< /骆驼:登录>
< /骆驼:路线> <骆驼:路线>
<骆驼:从URI =直接:aggegateQueries/>
<骆驼:总strategyRef =aggrTaskcompletionInterval =8000>
<骆驼:correlationEx pression>
<骆驼:XPath的> // TE:订单< /骆驼:XPath的>
< /骆驼:correlationEx pression>
<骆驼:文件:源/数据/ catask /输出文件名=的Output.xml到URI = />< /骆驼:骨料>
< /骆驼:路线>
我的聚合策略类看起来像这样
公开交易总量(交易所oldExchange,交易所newExchange){
如果(oldExchange == NULL){
的System.out.println(回归新交易所);
返回newExchange;
} 字符串oldBody = oldExchange.getIn()getBody(为String.class)。
字符串newBody = newExchange.getIn()getBody(为String.class)。
oldExchange.getIn()setBody(oldBody +++ newBody)。
返回oldExchange;
}
的问题是,当聚合结果保存在output.xml中文件仅包含从orders.xml中读出的最后一个记录。
即。
<订购的xmlns =HTTP://有的/模式/订单>
&所述; ORDERNUM→6&下; / ORDERNUM>
< /排序>
我看着它进一步发现,这是第一次通话后,由于发生oldExchange应该有一定的价值,但事实证明它总是空。我认为,因为它是阅读所有从单个文件和分裂它,只有交流。
>任何建议??
更新1:每克劳斯我可以用分路器只是为了解决这个问题。我这样做,并能成功地加入所有的消息。但是我仍然在寻找一种方式来使用Aggregator2。在这里,我做到了用分离器只。
骆驼:路线>
<骆驼:文件:源/数据/ catask /测试空操作=真正的从URI = />
<骆驼:日志消息=$ {}身体>< /骆驼:登录>
<骆驼:拆分strategyRef =aggrTask>
<骆驼:XPath的> // TE:订单/ *< /骆驼:XPath的>
<骆驼:到URI =直接:logQueries/>
<
< /骆驼:分流>< /骆驼:路线><骆驼:路线>
<骆驼:从URI =直接:logQueries/>
<骆驼:日志消息=的号召后:\\ n $ {}身体>< /骆驼:登录>
< /骆驼:路线>
我想我想通了如何使用聚合汇总的消息。我添加了一个headerName名为id,并用它作为我的相关ID。
<骆驼:路线>
<骆驼:文件:源/数据/ catask /测试空操作=真正的从URI = />
<骆驼:日志消息=$ {}身体>< /骆驼:登录>
<骆驼:分流>
<骆驼:XPath的> // TE:订单/ *< /骆驼:XPath的>
<骆驼:到URI =直接:addHeaders/>
<骆驼:到URI =直接:aggegateQueries/>
< /骆驼:分流>< /骆驼:路线><骆驼:路线>
<骆驼:从URI =直接:addHeaders/>
<骆驼:headerName的setHeader =ID>
<骆驼:常数GT;为了< /骆驼:常数GT;
< /骆驼:&的setHeader GT;< /骆驼:路线> <骆驼:路线>
<骆驼:从URI =直接:aggegateQueries/>
<骆驼:总strategyRef =aggrTaskcompletionInterval =8000>
<骆驼:correlationEx pression>
<简单且GT; header.id< /简单>
< /骆驼:correlationEx pression>
<骆驼:文件:源/数据/ catask /输出文件名=的Output.xml到URI = />
<骆驼:日志消息=已合并:: / N $ {}身体>< /骆驼:登录>
< /骆驼:骨料>
< /骆驼:路线>
此聚集我的信息。不过,我现在还不能确定,尽管使用正确的XPATH为什么骆驼认为这是不同类型的消息?
骆驼论坛复印CLAUS的解释:
的看起来像它的前相关pression这是一个新的组
每个消息,如每个XPath的结果是不同的。
如果要分割,并加入相同的消息,然后看到这个EIP
http://camel.apache.org/composed-message-processor.html
而看到只使用分路器的例子。那是很容易做的。 的
我测试使用XPath计算器工具中的XPath前pression也打印出来coorelation前pression结果和我的//订单的所有消息都是一样的。 EX-
1组:
<排序>
&所述; ORDERNUM→1&下; / ORDERNUM>
< /排序> 第2组:
<排序>
&所述; ORDERNUM→2&下; / ORDERNUM>
< /排序>
I see that this question has been asked a number of times but none of post helped or had a conclusive solution. I am splitting a message and then aggregating it using Aggregator2. The code was throwing exception because oldExchange was always null. So to test I designed a small code.
I read an orders,xml file which looks like this
<Orders xmlns="http://some/schema/Order">
<Order>
<orderNum>1</orderNum>
</Order>
<Order>
<orderNum>2</orderNum>
</Order>
<Order>
<orderNum>3</orderNum>
</Order>
<Order>
<orderNum>5</orderNum>
</Order>
<Order>
<orderNum>6</orderNum>
</Order>
My camel Context Looks like this
<camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split>
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
<camel:to uri="direct:aggegateQueries"/>
</camel:split>
</camel:route>
<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call : \n ${body}"></camel:log>
</camel:route>
<camel:route>
<camel:from uri="direct:aggegateQueries"/>
<camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
<camel:correlationExpression>
<camel:xpath>//te:Order</camel:xpath>
</camel:correlationExpression>
<camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
</camel:aggregate>
</camel:route>
My Aggregation Strategy class looks like this
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
System.out.println("Returning new exchange");
return newExchange;
}
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(oldBody + "+" + newBody);
return oldExchange;
}
The problem is that when the aggregated result is saved in output.xml file it contains only the last record it read from Orders.xml.
i.e.
<Order xmlns="http://some/schema/Order">
<orderNum>6</orderNum>
</Order>
I looked into it further and found that this was happening because after the first call oldExchange should have some value but it turns out it is always null. I think that because it is reading everything from a single file and splitting it, there is only exchange.
> Any suggestions??
UPDATE 1: Per Claus I can use Splitter only to solve this issue. I did that and was able to successfully join all the messages. However I am still looking for a way to use Aggregator2. Here how I did it using Splitter only.
camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split strategyRef="aggrTask">
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:logQueries"/>
<
</camel:split>
</camel:route>
<camel:route>
<camel:from uri="direct:logQueries"/>
<camel:log message="After the call : \n ${body}"></camel:log>
</camel:route>
I think I figured it out how to aggregate the messages using Aggregator. I added a headerName called id and use it as my correlation id.
<camel:route>
<camel:from uri="file:src/data/catask/test?noop=true"/>
<camel:log message="${body}"></camel:log>
<camel:split>
<camel:xpath>//te:Orders/*</camel:xpath>
<camel:to uri="direct:addHeaders"/>
<camel:to uri="direct:aggegateQueries"/>
</camel:split>
</camel:route>
<camel:route>
<camel:from uri="direct:addHeaders"/>
<camel:setHeader headerName="id">
<camel:constant>order</camel:constant>
</camel:setHeader>
</camel:route>
<camel:route>
<camel:from uri="direct:aggegateQueries"/>
<camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
<camel:correlationExpression>
<simple>header.id</simple>
</camel:correlationExpression>
<camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
<camel:log message="MERGED:: /n ${body}"></camel:log>
</camel:aggregate>
</camel:route>
This aggregates my messages. However I am still not sure that despite using correct XPATH why does Camel thinks that it is different type of message?
COPYING CLAUS's explanation from camel forums: "Looks like its your correlation expression that is a new group for each message, eg each xpath result is different. If you want to split and join the same messages then see this eip http://camel.apache.org/composed-message-processor.html And see the example using only splitter. That is much easier to do. "
I tested the Xpath expression using a Xpath Evaluator tool and also printed out the coorelation expression results and all my messages with //Order are same. Ex-
Group 1:
<Order>
<orderNum>1</orderNum>
</Order>
Group 2:
<Order>
<orderNum>2</orderNum>
</Order>
这篇关于Apache的骆驼 - 斯普利特和聚合 - 旧的Exchange总是空的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!