Apache Camel:具有聚合的多播-AggregationStrategy经常调用 [英] Apache Camel: multicast with aggregation - AggregationStrategy called too often

查看:389
本文介绍了Apache Camel:具有聚合的多播-AggregationStrategy经常调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

对于多播+聚合,我有以下奇怪(或至少我不清楚)的行为.考虑以下路线:

    from("direct:multicaster")
                .multicast()
                .to("direct:A", "direct:B")
                .aggregationStrategy(new AggregationStrategy() {
                    @Override
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        if (oldExchange == null) {
                            List firstResult = newExchange.getIn().getBody(List.class);
                            newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
                            return newExchange;
                        } else {
                            List oldResults = oldExchange.getIn().getBody(List.class);
                            List newResults = newExchange.getIn().getBody(List.class);
                            ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
                            oldExchange.getIn().setBody(aggResult);
                            return oldExchange;
                        }
                    }
                })
                .end()
//                .to("log:bla")

从本质上讲,此路由接受输入,将其发送到direct:Adirect:B,期望来自这两个端点的列表并将它们连接起来(由于我稍后将解释原因,最后一行中的注释在那里)./p> 现在,假定这两个端点返回".分别列出[A]和[B].如果我将消息M发送到direct:multicaster,则使用oldExchange = nullnewExchange.in.body=[A]调用聚合器一次,然后使用oldExchange.in.body=[A]newExchange.out.body=[B]调用聚合器(应该这样做).

到目前为止一切都很好.但是使用oldExchange.in.body=[A,B]newExchange.in=M再次调用聚合器(M是初始消息).这看起来与包含的浓缩模式相似.

您可以通过删除最后一行中的注释来获得预期的行为,即只需添加一个虚拟to("log:bla").一切都会按预期进行.

更新:正在尝试(请参见克劳斯提供的提示)

            .multicast()
            .aggregationStrategy(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

            .multicast(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

两者的行为相同.

这是怎么回事-我怎么了?

先谢谢了 马库斯

解决方案

我尝试重现该问题,但没有成功.这就是我所做的:

路线:

public class MulticastRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        AggregationStrategy myAggregationStrategy = new MyAggregationStrategy();
        List<String> listA = Lists.newArrayList("A");
        List<String> listB = Lists.newArrayList("B");
        from("direct:multicast").routeId("multicastRoute").multicast(myAggregationStrategy).to("direct:A", "direct:B").end();

        from("direct:A").setBody(constant(listA));
        from("direct:B").setBody(constant(listB));
    }

    class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public org.apache.camel.Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            System.out.println("Aggregate called with oldExchange = " + (oldExchange == null ? "null" :
                    oldExchange.getIn().getBody().toString()) + ", newExchange = " +
                    newExchange.getIn().getBody().toString());
            return newExchange;
        }
    }
}

创建了一个简单的测试来运行该路线.

测试:

public class MulticastRouteTest extends CamelTestSupport {
  @Test
    public void testMulticastRoute() throws Exception {
        context.addRoutes(new MulticastRoute());
        template.sendBody("direct:multicast", null);
    }
}

此打印:

Aggregate called with oldExchange = null, newExchange = [A]
Aggregate called with oldExchange = [A], newExchange = [B]

这就是我们所期望的.希望这会帮助你.我看不出我做事的方式有什么不同,但希望您会发现它.

I have the following strange (or at least unclear to me) behaviour for a multi-cast + aggregation. Consider the following route:

    from("direct:multicaster")
                .multicast()
                .to("direct:A", "direct:B")
                .aggregationStrategy(new AggregationStrategy() {
                    @Override
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        if (oldExchange == null) {
                            List firstResult = newExchange.getIn().getBody(List.class);
                            newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
                            return newExchange;
                        } else {
                            List oldResults = oldExchange.getIn().getBody(List.class);
                            List newResults = newExchange.getIn().getBody(List.class);
                            ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
                            oldExchange.getIn().setBody(aggResult);
                            return oldExchange;
                        }
                    }
                })
                .end()
//                .to("log:bla")

Essentially, this route takes an input, sends it to direct:A and direct:B, expects lists from these two endpoints and concatenates them (the comment in the last line is there for a reason I will explain later).

Now assume that these two endpoints "return" the lists [A] and [B], respectively. If I send the message M to direct:multicaster, then the aggregator is called once with oldExchange = null and newExchange.in.body=[A], then with oldExchange.in.body=[A] and newExchange.out.body=[B] (as it is supposed to do).

All good up to this point. But the aggregator is called once more with oldExchange.in.body=[A,B] and newExchange.in=M (M is the initial message). This looks similar to an included enrichment pattern.

You can get the expected behaviour by removing the comment in the last line, i.e. simply adding a dummy to("log:bla"). With this everthing behaves as expected.

Update: Trying (cf. the hint provided by Claus)

            .multicast()
            .aggregationStrategy(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

and

            .multicast(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

both result in the same behaviour.

What is happening here - what did I get wrong?

thanks in advance markus

解决方案

I have tried to reproduce the problem, but without success. This is what I did:

The route:

public class MulticastRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        AggregationStrategy myAggregationStrategy = new MyAggregationStrategy();
        List<String> listA = Lists.newArrayList("A");
        List<String> listB = Lists.newArrayList("B");
        from("direct:multicast").routeId("multicastRoute").multicast(myAggregationStrategy).to("direct:A", "direct:B").end();

        from("direct:A").setBody(constant(listA));
        from("direct:B").setBody(constant(listB));
    }

    class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public org.apache.camel.Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            System.out.println("Aggregate called with oldExchange = " + (oldExchange == null ? "null" :
                    oldExchange.getIn().getBody().toString()) + ", newExchange = " +
                    newExchange.getIn().getBody().toString());
            return newExchange;
        }
    }
}

Created a simple test just to run the route.

The test:

public class MulticastRouteTest extends CamelTestSupport {
  @Test
    public void testMulticastRoute() throws Exception {
        context.addRoutes(new MulticastRoute());
        template.sendBody("direct:multicast", null);
    }
}

This prints:

Aggregate called with oldExchange = null, newExchange = [A]
Aggregate called with oldExchange = [A], newExchange = [B]

This is what we would expect. Hope this will help you. I can not see any difference in the way I do things, but hopefully you will spot it.

这篇关于Apache Camel:具有聚合的多播-AggregationStrategy经常调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆