多线程JMS客户端ActiveMQ [英] Multithreaded JMS client ActiveMQ

查看:126
本文介绍了多线程JMS客户端ActiveMQ的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用以下代码为多个消费者创建多个JMS会话以使用消息。我的问题是代码以单线程方式运行。即使消息存在于队列中,第二个线程也无法接收任何内容并且只是保持轮询。第一个线程同时完成第一批处理并返回并消耗剩余的消息。这里的使用有什么问题吗?

I am using the below code to create multiple JMS sessions for multiple consumers to consume messages. My problem is that the code is running in a single threaded fashion. Even if messages are present in the Queue the Second thread is unable to receive anything and just keeps polling. The first thread meanwhile finishes processing the first batch and comes back and consumes the remaining messages. Is there anything wrong with the usage here ?

static {
    try {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616");
        connection = connectionFactory.createConnection();
        connection.start();
    } catch (JMSException e) {
        LOGGER.error("Unable to initialise JMS Queue.", e);
    }

}

public JMSClientReader(boolean isQueue, String name) throws QueueException {

    init(isQueue,name);
}

@Override
public void init(boolean isQueue, String name) throws QueueException
{

    // Create a Connection
    try {
        // Create a Session
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        if (isQueue) {
            destination = new ActiveMQQueue(name);// session.createQueue("queue");
        } else {
            destination = new ActiveMQTopic(name);// session.createTopic("topic");
        }
        consumer = session.createConsumer(destination);
    } catch (JMSException e) {
        LOGGER.error("Unable to initialise JMS Queue.", e);
        throw new QueueException(e);
    }
}

public String readQueue() throws QueueException {

    // connection.setExceptionListener(this);
    // Wait for a message
    String text = null;
    Message message;
    try {
        message = consumer.receive(1000);
        if(message==null)
            return "done";
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            text = textMessage.getText();
            LOGGER.info("Received: " + text);
        } else {
            throw new JMSException("Invalid message found");
        }
    } catch (JMSException e) {
        LOGGER.error("Unable to read message from Queue", e);
        throw new QueueException(e);
    }


    LOGGER.info("Message read is " + text);
    return text;

}


推荐答案

你的问题是prefetchPolicy。

your problem is the prefetchPolicy.

persistent queues (default value: 1000)
non-persistent queues (default value: 1000)
persistent topics (default value: 100)
non-persistent topics (default value: Short.MAX_VALUE - 1)

所有消息都被分派给第一个连接的消费者,当另一个消息连接时他没有收到消息,所以要改变这种行为,如果你有一个队列的并发消费者,你需要将prefetchPolicy设置为低于默认值的值。例如,将此 jms.prefetchPolicy.queuePrefetch = 1 添加到activemq.xml中的uri配置中,或者将其设置在客户端URL上,如下所示

all messages was dispatched to the first connected consumer and when another one connects he don't receive messages, so to change this behavior if you have concurrent consumer for a queue you need to set prefetchPolicy to a lower value than default. for example add this jms.prefetchPolicy.queuePrefetch=1 to the uri config in activemq.xml or set it on the client url like this

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://172.16.143.99:61616?jms.prefetchPolicy.queuePrefetch=1");




建议使用大预取值以获得高性能且高
消息卷。但是,对于较低的消息量,每个
消息需要很长时间才能处理,预取应该设置为1.
这样可以确保消费者一次只处理一条消息。
但是,将预取限制设置为零将导致消费者
一次一个地轮询消息,而不是将
消息推送给消费者。

Large prefetch values are recommended for high performance with high message volumes. However, for lower message volumes, where each message takes a long time to process, the prefetch should be set to 1. This ensures that a consumer is only processing one message at a time. Specifying a prefetch limit of zero, however, will cause the consumer to poll for messages, one at a time, instead of the message being pushed to the consumer.

看看 http://activemq.apache.org/what-is-the-prefetch-limit-for.html

并且

http:// activemq。 apache.org/destination-options.html

这篇关于多线程JMS客户端ActiveMQ的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆