如何骆驼2.11聚集一批具有独立路线的作品? [英] How Camel 2.11 batch aggregation works with separate route?

查看:231
本文介绍了如何骆驼2.11聚集一批具有独立路线的作品?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先有一个类似的悬而未决的问题<一href=\"https://stackoverflow.com/questions/21584824/camel-joining-routes-into-a-single-aggregator\">Joining路由到一个聚合

我们有一些消费路线(FTP,文件,SMB)从远程系统读取文件。
简化直接路线测试,但类似的行为有一批消费者:

 从(直接:+ routeId).ID(routeId)
 .setProperty(AGGREGATION_PROPERTY,常数(routeId))
 .LOG(的String.format(发送($ {}体)到%s,直接:启动1))
 。要(直接:聚合);

改造后从一个民意调查的所有结果由一批在一个单独的路由聚合:

 从(直接:总)
  .aggregate(财产(AGGREGATION_PROPERTY),新BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  。要(日志:结果,模拟:结果);

一切工作正常,如果每个消费者都运行分开。但是,如果多个用户在并行运行的,聚集会分裂投票。如果文件消费者投票500封邮件和第二条路线开始从通过ftp expections读6个文件的例子是,我们得到2骨料1从文件500的消息,并从FTP 1 6的消息。

测试用例:

 公共无效testAggregateByProperty()抛出异常{
    MockEndpoint结果= getMockEndpoint(模拟:结果);    result.expectedBodiesReceived(A + A + A,B + B,A,Z);    template.sendBodyAndProperty(直接:A,A,Exchange.BATCH_SIZE,3);
    template.sendBodyAndProperty(直接:A,A,Exchange.BATCH_SIZE,3);
    template.sendBodyAndProperty(直接:B,B,Exchange.BATCH_SIZE,2);
    template.sendBodyAndProperty(直接:A,A,Exchange.BATCH_SIZE,3);
    template.sendBodyAndProperty(直接:B,B,Exchange.BATCH_SIZE,2);
    template.sendBodyAndProperty(直接:A,A,Exchange.BATCH_SIZE,1);
    template.sendBodyAndProperty(直接:Z,Z,Exchange.BATCH_SIZE,7);    assertMockEndpointsSatisfied();
}

结果是:A + A,B,A,B,A,而不是预期的A + A + A,B + B,A, Z。
问题:


  1. 是我们对聚集的假设错了?

  2. 我们如何才能实现预期的行为?

  3. 如果我们设置completionTimeout,它煤层的超时将第一次交换发生 - 如果独立仍然有新的交流


解决方案

您几乎都有它的工作。以下是你需要(之后我会解释)的变化。

 从(直接:聚合)。ID(汇总)
    .aggregate(财产(AGGREGATION_PROPERTY),新BodyInAggregationStrategy())
    .completionSize(财产(Exchange.BATCH_SIZE))
    。要(日志:结果,模拟:结果)

其结果将是:

 外汇收,体:A + A + A
交易所收,身体:乙+ B
交易所收,身体:一个

注意:您将不会收到结果为Z,因为批量大小为 7

要解释 - 因为您已阅读,在汇聚的是一个多功能的骆驼组成部分,关键的东西正确定义是:


  • 聚集前pression

  • 完成规则

现在你的情况,你都聚集在 AGGREGATION_PROPERTY 属性,这将是 A 以Z 。另外您指定的批次数量。

不过你是不是前$ P $在您的路线pssing一个 completionSize()。相反,你用 completionFromBatchConsumer - 它做了不同(code状态,它看起来对交易所#BATCH_COMPLETE 属性),因此奇怪的结果。

总之, .completionSize(Exchange.BATCH_SIZE)将根据需要测试运行。

好运进一步。

First there is a similar unanswered question Joining routes into single aggregator

We have some consumer routes (ftp, file, smb) reading files from remote systems. Simplified for test with direct route, but similar behavior with batch consumers:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

After transformation all results from one poll are aggregated by batch in a separate route:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

All works fine, if every consumer runs separated. But if multiple consumers runs in parallel, aggregation will split the polls. Example if file-consumer polls 500 messages and a second route starts to read 6 files from ftp the expections is that we get 2 aggregates 1 with 500 messages from file and 1 with 6 messages from ftp.

Testcase:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

The result is: "A+A", "B", "A", "B", "A" and not the expected "A+A+A", "B+B", "A", "Z". Questions:

  1. Is our assumption about aggregation wrong?
  2. How can we achieve the expected behavior?
  3. If we set completionTimeout, it seams that timeout will occur from first exchange - independent if there are still new exchanges?

解决方案

You almost have it working. Here is the change you need (and after I will explain).

from("direct:aggregate").id("aggregate")
    .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
    .completionSize(property(Exchange.BATCH_SIZE))
    .to("log:result", "mock:result")

The result will be:

Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A

Note: You won't receive a result for the "Z" since the batch size is 7.

To explain - as you have read, the Aggregator is a versatile camel component and the key things to define correctly are:

  • the aggregation expression
  • the completion rule

Now in your case you are aggregating on a property AGGREGATION_PROPERTY which will be A, B or Z. In addition you are specifying a batch size.

However you aren't expressing a completionSize() in your route. Instead you were using completionFromBatchConsumer - which does something different (the code states that it looks for a Exchange#BATCH_COMPLETE property), thus the weird results.

Anyway, .completionSize(Exchange.BATCH_SIZE) will make your test run as desired.

Good luck further.

这篇关于如何骆驼2.11聚集一批具有独立路线的作品?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆