使用 ActiveMQ 并行处理多条消息 [英] Processing multiple message in parallel with ActiveMQ

查看:59
本文介绍了使用 ActiveMQ 并行处理多条消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用简单的 Processor/AsyncProcessor 作为目标并行处理队列中的消息.处理器处理每条消息需要一点时间,但可以单独处理每条消息,因此可以同时(在正常范围内).

I'd like to process messages in a queue in parallel using a simple Processor/AsyncProcessor as a destination. The processor takes a little time per message, but each message can be handled seperately, and thus at the same time (within healthy boundaries).

我很难找到示例,尤其是骆驼路由的 xml 配置.

I'm having a hard time finding examples, especially about the xml configuration of camel routes.

到目前为止,我已经定义了一个线程池、路由和处理器:

So far, I've defined a threadpool, route and processor:

<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/>
<route>
    <from uri="broker:queue:inbox" />
    <threads executorServiceRef="smallPool">
        <to uri="MyProcessor" />
    </threads>
</route>
<bean id="MyProcessor" class="com.example.java.MyProcessor" />

我的处理器看起来像:

public class MyProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        Message in = exchange.getIn();
        String msg = in.getBody(String.class);      
        System.out.println(msg);
        try {
            Thread.sleep(10 * 1000); // Do something in the background
        } catch (InterruptedException e) {}
        System.out.println("Done!");
    }
}

不幸的是,当我将消息发布到队列时,它们仍然被一一处理,每个延迟 10 秒(我的后台任务").

Unfortunatly, when I post messages to the queue, they are still processed one by one, each delayed by 10 seconds (my "background task").

谁能为我指出正确的方向以使用定义的线程池处理消息或解释我做错了什么?

Can anyone point me to the right direction to have the messages processed using the defined threadpool or explain what I am doing wrong?

推荐答案

您应该使用 concurrentConsumers 选项,如评论中所述,

You should use the concurrentConsumers options as said in the comments,

<route>
    <from uri="broker:queue:inbox?concurrentConsumers=5" />
    <to uri="MyProcessor" />
</route>

请注意,还有 maxConcurrentConsumers 您可以设置使用最小/最大并发消费者范围,因此 Camel 将根据负载自动增长/缩小.

Notice there is also maxConcurrentConsumers you can set to use a min/max range of concurrent consumers, so Camel will automatic grow/shrink depending on load.

在 JMS 文档中查看更多详细信息

See more details in the JMS docs at

这篇关于使用 ActiveMQ 并行处理多条消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆