骆驼组播路由呼叫顺序 [英] Camel Multicast Route call order

查看:28
本文介绍了骆驼组播路由呼叫顺序的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

任务:

  • 我想创建一个批处理作业,如路由执行,由于我的用例,我必须使用自定义批处理.
  • 实现批处理作为来自路由multicast(a,save)的同步路由->multicast(b, save) ->multicast(c, save) ->multicast(d, save).在每条路线上,我都使用 multicast 来保存进程当前所在的当前状态.所以以后如果出现问题,我有关于批处理的清晰日志.
  • 提到的保存路由是一个 ActiveMQ 路由,只有一个消费者和一个生产者
  • 提到的a,b,c,d 路由是一个direct:调用路由
  • 前面提到的 ActiveMQ 是一个 FIFO 队列
  • I want to create a batch job like route execution, due to my use case I have to use custom batch processing.
  • Implemented batch processing as a synchronous route from route multicast(a,save) -> multicast(b, save) -> multicast(c, save) -> multicast(d, save). On each route, I'm using a multicast to save the current status where the process is currently. So later if something goes wrong, I have clear logs about the batch process.
  • The mentioned save route is an ActiveMQ route with a single consumer and a single producer
  • The mentioned a,b,c,d route is a direct: call to the route
  • The mentioned ActiveMQ is a FIFO queue

问题:

  • 当路由 a 被调用时,它开始执行所描述的路由,所以首先 a ->b ->c ->d.然而,来自 multicast 的保存路由调用似乎从最后到第一个.因此,按以下顺序调用保存路由:d ->c ->b ->a
  • 我尝试使用 .parallelProcessing(),它似乎对 multicast 中的其他路由启动了一个新进程,但随后发生的情况是它完全忘记了路由并且保存没有发生在保存路线上.
  • When Route a called then it starts to executes the routes as described, so first a -> b -> c -> d. HOWEVER, the save route call from the multicast seems to go from Last to first. So, Save route called in this order: d -> c -> b -> a
  • I tried to use .parallelProcessing() which seem to start a new process to other routes in the multicast, but then what happens is it completely forgets the route and saving not happening to the save route.

期望的行为:

  • 同步执行a ->b ->c ->d
  • a的顺序同步执行保存路由->b ->c ->d
  • Synchronous execution of a -> b -> c -> d
  • Synchronous execution of save route in order of a -> b -> c -> d

相关示例代码:

private static final String a = "direct:a";
private static final String b = "direct:b";
private static final String c = "direct:c";
private static final String d = "direct:d";

private static final String save = "activemq:save";

private static final String toFile = "file:///camel-logs/batch-status";

@Override
public void configure() throws Exception {

    ExceptionHandlerBuilder.retry(this);

    from(a)
        .multicast().parallelProcessing()
        .to(b, save);

    from(b)
        .multicast().parallelProcessing()
        .to(c, save);

    from(c)
        .multicast().parallelProcessing()
        .to(d, save);

    from(d)
        .multicast().parallelProcessing()
        .to("mock:end", save);
                
    from(save)
        .process(new ModifyText())
        .to(toFile + "?fileExist=Append&charset=utf-8");
}

  • 可以看出,activeMQ 的配置可能存在一些问题,以防万一我添加了 JMS 组件的配置代码

组件:

public class Components { 
@Bean 
public JmsComponent jmsComponent() throws JmsException { 
ActiveMQConnectionFactory activeMQCF = Utilities.getActiveMQFactory(); 
JmsComponent jms = new JmsComponent(); 
jms.setConnectionFactory(activeMQCF); 
return jms; 
} 
}

功能:

    public static ActiveMQConnectionFactory getActiveMQFactory() {
        logger.debug("getActiveMQFactory called");
        String brokerUrl = getActiveMQConfig();

        ActiveMQConnectionFactory activeMQCF = new ActiveMQConnectionFactory();
        activeMQCF.setBrokerURL(brokerUrl);
        activeMQCF.setUserName(activeMQUserName);
        activeMQCF.setPassword(activeMQPassword);
        return activeMQCF;
    }

推荐答案

问题答案:

  • 我的问题的答案仍然最受欢迎,但是现在我找到了解决方法

解决方案:

  • 在每个 from().to() 之间,现在我在该处理器中添加了一个 .processor() 我能够添加一个可以正确调用端点的生产者模板
  • 删除了 ActiveMQ

路线:

private static final String Save = "direct:save"

处理器:

        public class SaveProcessor implements Processor {
            public void process(Exchange exchange) throws Exception {
                ProducerTemplate template = exchange.getContext().createProducerTemplate();
                template.send(Save, exchange);
        /** NOTE
         *  DO NOT USE any other functions which are not working with EXCHANGES.
         *  Functions that uses body, header values and such are bugged
         */ 
            }
        }

实现路由:

from(a)
    .process(new SaveProcessor())
    .to(b);

这篇关于骆驼组播路由呼叫顺序的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆