骆驼队列中的线程 DSL 行为 [英] Thread DSL behavior with queue in camel

查看:24
本文介绍了骆驼队列中的线程 DSL 行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在下面的路线中,我希望来自 queue1 的 10 个 msg 应该同时处理,但一次只有一个得到处理.

With below route i expect that 10 msg from queue1should be process concurrently, but only one gets process at a time.

我期待错了吗?或者做错了什么?

context.addRoutes(new RouteBuilder() {
        public void configure() {                                       
            from("test-jms:queue:test.queue1").threads(10)
            .process(sleep(1)); // sleep id is 1                
        }

        private Processor sleep(final int sleepId) {
            return new Processor() {                    
                @Override
                public void process(Exchange exchange) throws Exception {                       
                    System.out.println(curTime() + " Going for sleep sleepid=" + sleepId );
                    Thread.sleep(5000l);                        
                    System.out.println(curTime() + " Done sleep sleepid=" + sleepId );
                }
            };
        }

使用以下方法调用上述路由:

Calling the above routes using:

   ExecutorService ec = Executors.newFixedThreadPool(5);

    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));
    ec.submit(new Task(context,template));

static class Task  implements Runnable{
    CamelContext context;
    ProducerTemplate template;
    public Task(CamelContext context, ProducerTemplate template) {
        super();
        this.context = context;
        this.template = template;
    }
    @Override
    public void run() {         
           Exchange exchange = new DefaultExchange(context);
           exchange.setPattern(ExchangePattern.InOnly);
           exchange.getIn().setBody("Test Message: " + Thread.currentThread().getName());
           System.out.println(Thread.currentThread().getName());
           Exchange send = template.send("test-jms:queue:test.queue1",exchange);
           System.out.println("completed");           
    }

}

代码输出:

10:24:11 Going for sleep sleepid=1
10:24:16 Done sleep sleepid=1

10:24:16 Going for sleep sleepid=1
10:24:21 Done sleep sleepid=1

10:24:21 Going for sleep sleepid=1
10:24:26 Done sleep sleepid=1

10:24:26 Going for sleep sleepid=1
10:24:31 Done sleep sleepid=1

10:24:31 Going for sleep sleepid=1
10:24:36 Done sleep sleepid=1

如果我们观察时间戳,我们将看到该路由一次仅处理 1 个 msg.

If we observe the timestamp we will see that route is only processing 1 msg at time.

推荐答案

您需要在 JMS 端点上启用 asyncConsumer 以允许它异步.这样做时,从队列中消费的消息可能会被乱序处理,这就是为什么默认情况下对消费者进行排序的原因.

You need to enable asyncConsumer on the JMS endpoint to allow it to be async. When doing this then messages consumed from the queue can be processed out of order, and hence why a consumer is ordered by default.

代码应该是

 public void configure() {                                       
            from("test-jms:queue:test.queue1?asyncConsumer=true").threads(10)
            .process(sleep(1)); // sleep id is 1                
        }

但是 JMS 组件具有内置的并发性,通常更好用,因为它可以使用并发 JMS 消费者和并发网络.有关更多详细信息,请参阅选项 concurrentConsumersmaxConcurrentConsumers.

But the JMS component has built-in concurrency which is usually better to use, as then it can use concurrent JMS consumers, and concurrent networking. See the options concurrentConsumers and maxConcurrentConsumers for more details.

这篇关于骆驼队列中的线程 DSL 行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆