ActiveMQ节流消费者 [英] ActiveMQ throttling consumer

查看:155
本文介绍了ActiveMQ节流消费者的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想对hornetq中的activeMQ中某个队列的使用者进行节流(对于jboss,这是对mdb Consumer的定义进行了注释)。我在activemq的文档中找不到任何类似的东西,我发现最接近的是

  consumer.recvDelay 0 ms暂停消费者每条消息的recvDelay毫秒(允许消费者限制)。 

来自: http://activemq.apache.org/activemq-performance-module-users-manual.html



但是我找不到在Java中的方法。



谢谢,



问候。



编辑:这是ActiveMQManager代码和使用者代码:

 公共类ActiveMQManager {

private static ActiveMQConnectionFactory CONNECTION_FACTORY;

公共静态连接CONNECTION;

公共静态会话会话;

公共静态目标TEST_QUEUE;

public static void start(){
try {

CONNECTION_FACTORY = new ActiveMQConnectionFactory( vm:// localhost);

CONNECTION = CONNECTION_FACTORY.createConnection();
CONNECTION.start();

SESSION = CONNECTION.createSession(false,
Session.CLIENT_ACKNOWLEDGE);

TestClient testClient = new TestClient();

TEST_QUEUE = SESSION.createQueue( TEST.QUEUE);

MessageConsumer testConsumer = SESSION.createConsumer(TEST_QUEUE);
test.setMessageListener(testClient);

}捕获(异常e){
}
}

public static void stop(){
try {
//清理
SESSION.close();
CONNECTION.close();
} catch(JMSException e){
log.error(e);
}
}
}

消费者代码非常简单(对于此示例):

 公共类TestConsumer实现MessageListener {

@Override
public void onMessage(消息消息){
//使用消息
}

}


解决方案

这取决于所使用的消费者技术...但是这里有一些选择




  • 您可以手动在用户代码中引入延迟(这不是一门精确的科学,但这会限制吞吐量)


  • 您还可以通过设置JMS连接的maxConcurrentConsumers属性来控制使用者使用的线程数...也就是说,这不会限制消息吞吐量,只是限制了使用者使用的并发级别


  • 更好的是,您可以设置每次每时间消耗的确切消息数iod使用节流阀EIP实现



    例如,使用骆驼是不重要的节流阀



    from( activemq:queueA)。throttle(10).to( activemq:queueB)



i want to do a throttling to a consumer of some queue in the activeMQ, in hornetq (of jboss, this is do it with annotations on the definition of the mdb Consumer). I can't find any similar in the documentation of activemq, the closest that i find was this

consumer.recvDelay   0 ms    Pause consumer for recvDelay milliseconds with each message (allows consumer throttling).

from: http://activemq.apache.org/activemq-performance-module-users-manual.html

But there i can't find how i can do it in java.

Thanks in advance,

Regards.

EDIT: Here is the ActiveMQManager code and the consumer code:

public class ActiveMQManager {

    private static ActiveMQConnectionFactory CONNECTION_FACTORY;

    public static Connection CONNECTION;

    public static Session SESSION;

    public static Destination TEST_QUEUE;

    public static void start() {
        try {

            CONNECTION_FACTORY = new ActiveMQConnectionFactory("vm://localhost");

            CONNECTION = CONNECTION_FACTORY.createConnection();
            CONNECTION.start();

            SESSION = CONNECTION.createSession(false,
                    Session.CLIENT_ACKNOWLEDGE);

            TestClient testClient = new TestClient();

            TEST_QUEUE = SESSION.createQueue("TEST.QUEUE");

            MessageConsumer testConsumer = SESSION.createConsumer(TEST_QUEUE);
            test.setMessageListener(testClient);

        } catch (Exception e) {
        }
    }

    public static void stop() {
        try {
            // Clean up
            SESSION.close();
            CONNECTION.close();
        } catch (JMSException e) {
            log.error(e);
        }
    }
}

The consumer code is very simple (for this example):

public class TestConsumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        //Do something with the message
    }

}

解决方案

this depends on the consumer technology being used...but here are a few options

  • you can manually introduce a delay in your consumer code (not an exact science but this will limit the throughput)

  • you can also control the number of threads that your consumer uses by setting maxConcurrentConsumers property of you JMS connection...that said, this won't throttle message throughput, just limit the level of concurrency being used by your consumer

  • better yet, you can set the exact number of messages to consume per time period using a throttler EIP implementation

    for example, this is trivial using the Camel Throttler

    from("activemq:queueA").throttle(10).to("activemq:queueB")

这篇关于ActiveMQ节流消费者的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆