活动MQ连接问题 [英] Active MQ connection issue

查看:169
本文介绍了活动MQ连接问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您好,我正在研究wso2 esb并将Active MQ用于消息队列。

Hi am working on wso2 esb and using Active MQ for message queue.

我有一个简单的服务来放置一条消息,在该消息中它将调用自定义Java类。创建一个tcp连接并将消息放入队列中。

I have a simple service to place a message in which it call custom java class where it creates a tcp connection and drops a message in queue.

Java代码如下所示

Java code looks like below

   package in.esb.custommediators;

import javax.jms.*; 

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.MessageContext; 
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.mediators.AbstractMediator;

import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.transport.nhttp.NhttpConstants;

import org.json.JSONObject;
import org.json.XML;

public class JMSStoreMediator extends AbstractMediator implements
ManagedLifecycle { 

    Connection connection;
    Session session;

    public boolean mediate(MessageContext msgCtx) { 

        log.info("LogLocation = "+getClass().getName()+",ProxyName = "+msgCtx.getProperty("proxy.name")+
                ",Usercode = "+msgCtx.getProperty("usercode")+",Clientid = "+msgCtx.getProperty("clientid")+
                ",requestMsgId = "+msgCtx.getProperty("requestMsgId")+",Position = START"); 


         try {
             boolean topic=false;
            String jmsuri=""+msgCtx.getProperty("jmsuri");
            String t=""+msgCtx.getProperty("topic");
            if(t.isEmpty()){
                topic=false;
            }
            else {
                topic=Boolean.valueOf(t);
            }
            ConnectionFactory factory= new ActiveMQConnectionFactory(jmsuri);
            connection = factory.createConnection();
                connection.start();

            log.info("LogLocation = "+getClass().getName()+",JMS connection created :"+connection);
            this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination=null;
            if(!topic)destination= session.createQueue(""+msgCtx.getProperty("jmsqueue"));
            else destination= session.createTopic(""+msgCtx.getProperty("jmsqueue"));
            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);

            String xml = ""+msgCtx.getEnvelope().getBody().toStringWithConsume();

            if(topic){

                JSONObject obj=XML.toJSONObject(xml);
                JSONObject ar=obj.getJSONObject("soapenv:Body");
                ar.remove("xmlns:soapenv");
                xml=ar.toString();
            }
            TextMessage message = session.createTextMessage(xml);
            producer.send(message);


        } catch (Exception e) {

            log.info("LogLocation = "+getClass().getName()+",Error in storing message in JMS stacktrace is :"+e.toString()+"message is :"+e.getMessage());
            e.printStackTrace();

            ((Axis2MessageContext) msgCtx).setProperty(NhttpConstants.HTTP_SC, 500);
            handleException("Error while storing in the message store", msgCtx);

        }
        finally {
            try {
                session.close();
                if (connection!=null){
                    log.info("LogLocation = "+getClass().getName()+",JMS connection closing :"+connection);
                    connection.close();
                }

            } catch (JMSException e) {
                log.info("LogLocation = "+getClass().getName()+",Error in closing JMS connection stacktrace is :"+e.toString());
                e.printStackTrace();
            }
        }

        return true;
    }

    @Override
    public void destroy() {
        // TODO Auto-generated method stub

    }

    @Override
    public void init(SynapseEnvironment arg0) {
        // TODO Auto-generated method stub

    }

}

当我调用此服务以在日志下方的队列中发送消息时生成。

when i call this service to send a message in queue below logs get generated.

[2017-07-29 11:18:35,962]  INFO - JMSStoreMediator LogLocation = in.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-3:1,clientId=ID:my-desktop-36442-1501307315570-2:1,started=true}

到目前为止,一切正常,但是当两个用户尝试在同一轮胎上提交消息时,会发生一些奇怪的事情,如下所示

As of now every thing is working good , But when two users try to submit message at the same tire some strange thing happen as shown below

[2017-07-29 11:43:11,948]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=false}
[2017-07-29 11:43:11,963]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,JMS connection created :ActiveMQConnection {id=ID:my-desktop-36442-1501307315570-11:1,clientId=ID:my-desktop-36442-1501307315570-10:1,started=true}

[2017-07-29 11:43:12,068]  INFO - JMSStoreMediator LogLocation = in.my.esb.custommediators.JMSStoreMediator,Error in closing JMS connection stacktrace is :org.apache.activemq.ConnectionClosedException: The connection is already closed

活动MQ正在创建两个连接,但是两个连接都使用一个连接,并且一个连接在一个服务调用中被关闭,并且在另一个服务调用中抛出已关闭的错误,另一个连接在活动mq的连接列表中永远等待活动状态为s在下面的图像中显示 Hown,这在ESB线程列表中也可以看到。

Active MQ is creating two connections but using one connection for both the calls and that one connection is getting closed in one of the service call and throwing already closed error in the other service call and the other connection is waiting forever in the connection list of active mq with active status true as shown in the below image and this is also seen in the ESB thread list.

这种连接堆积并导致ESB服务器挂起。即使我从Active MQ ESB线程重置了此连接,也携带了此连接信息,并且仅在重新启动ESB之后,问题才得以解决。

This kind of connections pileup and causing hangs ESB server. Even if i reset this connections from Active MQ ESB threads carry this connection info and only after a restart of ESB the problem get fixed.

推荐答案

您是否已阅读文章扩展WSO2企业服务总线的功能-第1部分

重要的部分是线程安全。它指出,每个传入中介(包括自定义)都在传入消息之间共享。我建议移动类变量

Important part is Threading Safety. It states, each mediator, including custom, is shared between incoming messages. I recommend to move class variables

Connection connection;
Session session;

到方法 public boolean mediate(MessageContext msgCtx)因为本地变量是线程安全

to method public boolean mediate(MessageContext msgCtx) since local variables are thread safe

public class JMSStoreMediator extends AbstractMediator implements
ManagedLifecycle {     

    public boolean mediate(MessageContext msgCtx) { 
             Connection connection;
             Session session;
    ....
    ....
    rest the same

这篇关于活动MQ连接问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆