JMS消息侦听器Weblogic的并发处理 [英] Concurrent Processing of JMS Message Listener Weblogic

查看:276
本文介绍了JMS消息侦听器Weblogic的并发处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在JMS上运行测试用例,发现处理是顺序的。当我向使用JMS发送消息的servlet发出了200个请求,并且接收者(messageListner)正在顺序接收请求。如何接收并发请求?我们有任何参数要设置吗?我阅读JMS教程和API在同一会话消息被交付,即使我为每个发送请求创建一个新的会话, 10个会话在接收端仍然处理是连续的。

I'm running a test case on JMS and found processing is sequential. When I fired 200 requests to a servlet which sends messages using JMS and receiver(messageListner) is reciving requests sequentially. How to receive concurrent requests? Do we have any parameters to set? I read JMS tutorials and API's that in a same session messages are delivered sequntially, even I'm creating a new session for each send request & 10 sessions at receiving end still processing is sequential.

public class ProducerServlet extends javax.servlet.http.HttpServlet implements
    javax.servlet.Servlet {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

TestJMSListener jms = new TestJMSListener();
ConnectionFactory connectionFactory = null;
Queue dest1 = null;
Topic dest =null;
Connection connection = null;
MessageProducer producer = null;

protected void doGet(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException {
        try {
            connection = connectionFactory.createConnection();              

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(dest1);
        TextMessage message = session.createTextMessage();

        message.setText("This is message from JMSSECOND DEMO "
                + request.getParameter("Num"));
        System.out.println("Sending message: " + message.getText());
        producer.send(message);
        producer.send(session.createMessage());
    } catch (Exception e) {
        System.out.println("Exception occurred: " + e.toString());
    }

}

@Override
public void init(ServletConfig arg0) throws ServletException {      
    Context jndiContext = null;
    try {

        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());            
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        e.printStackTrace();            
    }

}

}

Listner实现,其中在接收到消息之后我要睡觉(做一些事情一秒钟)。

Listner implementation where after receiving a message I'm going to sleep(doing something for a second).

public class TestJMSListener implements MessageListener {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

public TestJMSListener() {

    System.out.println("********* Consumer check **********");

    Context jndiContext = null;
    ConnectionFactory connectionFactory = null;
    Connection connection[] = null;
    Session session[] = null;
    Queue dest1 = null;
    Topic dest = null;
    MessageConsumer consumer[] = null;
    // TextMessage message = null;

    try {
        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());
        System.exit(1);
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        System.exit(1);
    }
    connection = new Connection[10];
    session = new Session[10];
    consumer = new MessageConsumer[10];
    for (int i = 0; i < 10; i++) {
        try {

            connection[i] = connectionFactory.createConnection();
            session[i] = connection[i].createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            consumer[i] = session[i].createConsumer(dest);
            consumer[i].setMessageListener(this);
            connection[i].start();
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
        }
    }
}

@Override
public void onMessage(Message m) {

    if (m instanceof TextMessage) {
        TextMessage message = (TextMessage) m;
        try {
            System.out.println("Reading message from Listener: "
                    + new Date() + message.getText());
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

 }

我使用Weblogic 11g,默认配置为ConnectionFactory&队列。当我使用Topic它实际上每秒只提供一个消息(即在第一个消息完成后),对于Queue它每秒提供2到3个消息。如何使我的监听器支持并发处理。

I'm using Weblogic 11g, with default configurations for ConnectionFactory & Queue. When I used Topic it actually delivering only one message per second(i.e. after completion of first message) and for Queue it is delivering 2 to 3 messages per second. How to make my listener to support concurrent processing.

添加更多监听器对象,消费者在listners它解决了目的。
下面的修改代码。

Added more listener objects insted multiple sessions/consumers in listners it solved the purpose. Find the Modified Code below.

public class ProducerServlet extends javax.servlet.http.HttpServlet implements
    javax.servlet.Servlet {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";
TestJMSListener listeners[] = new TestJMSListener[20];
ConnectionFactory connectionFactory = null;
Queue dest1 = null;
Topic dest =null;
Connection connection = null;
MessageProducer producer = null;

protected void doGet(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException {
        try {
            connection = connectionFactory.createConnection();              

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(dest1);
        TextMessage message = session.createTextMessage();

        message.setText("This is message from JMSSECOND DEMO "
                + request.getParameter("Num"));
        System.out.println("Sending message: " + message.getText());
        producer.send(message);
        producer.send(session.createMessage());
    } catch (Exception e) {
        System.out.println("Exception occurred: " + e.toString());
    }

}

@Override
public void init(ServletConfig arg0) throws ServletException {      
    Context jndiContext = null;
    try {

        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());            
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
        for(int i=0;i<listeners.length;i++ ){
        listeners[i]=new TestJMSListener(Integer.toString(i+1));    
        }

    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        e.printStackTrace();            
    }

}

}


public class TestJMSListener implements MessageListener {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

public String listnerNum = "";
public TestJMSListener(String listerNo) {
    super();
    System.out.println("********* Consumer check **********");
    listnerNum = listerNo;
    Context jndiContext = null;
    ConnectionFactory connectionFactory = null;
    Connection connection = null;
    Session session = null;
    Queue dest1 = null;
    Topic dest = null;
    MessageConsumer consumer = null;
    // TextMessage message = null;

    try {
        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());
        System.exit(1);
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        System.exit(1);
    }
    try{
            connection = connectionFactory.createConnection();
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            consumer = session.createConsumer(dest1);
            consumer.setMessageListener(this);
            connection.start();
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
        }


}

@Override
public void onMessage(Message m) {

    if (m instanceof TextMessage) {
        TextMessage message = (TextMessage) m;
        try {
            System.out.println("Reading message from Listener: "+listnerNum+ " : "
                    + new Date() + message.getText());
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

}


推荐答案

在您的代码中,您只有一个侦听器实例(在创建Servlet实例时创建),因此您将只接收顺序消息,
不考虑你有多少发送者会话..它是只是队列。

如果要同时接收,则可能需要多个侦听器,并且只有一个时间消息将在任何一个侦听器中传递。

如果要同时处理消息,一旦其传送顺序,然后创建线程池并在单独的线程中调度过程,并返回到监听模式。
注意**在这种模式下,您可能无法正确处理Ack模式,因为您没有完成消息过程。

In your code, You have only one Listener instance (Created while Servlet instance created) , thus you will be receiving the messages sequencial only ,
Irrespective of how many sender session you have .. it is Just Queue.
If you want receive concurrently , then you might need multiple Listeners and only one time message will be delivered in any one of the listeners.
If you want to process the Messages concurrently, once its delivered sequencly then create thread pool and deligate the process in seperate thread and return back to listening mode .
Note** in this Mode you may not be handle Ack mode properly since you are ack without completing the Message process.

这篇关于JMS消息侦听器Weblogic的并发处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆