传输侦听器和ActiveMq重新启动 [英] Transport Listener and ActiveMq restart

查看:127
本文介绍了传输侦听器和ActiveMq重新启动的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我开发了JMS应用程序,现在我想添加支持代理重新启动的功能.我有动态主题,应该在恢复连接后重新创建它们.另外,我还应该知道经纪人何时失灵以及何时回来. 因此,我尝试使用ActiveMQ故障转移协议来实现此功能.我实现了TransportListener,并在方法"transportInterrupted"中调用了完全断开连接,如

I develope JMS application and now I want to add feature that will support broker restart. I have dynamic topics and I should recreate them after my connection will be resumed. Also I should have a possibility to know when broker is down and when it come back. So I try to implement this feature using ActiveMQ failover protocol. I implement TransportListener and in the method "transportInterrupted" I call full disconnect like

  public void disconnect() throws JMSException {
    System.out.println("!!!!!!!DISCONNECTING!!!!!!!!");
    consumer.close();
    session.close();
    session = null;
    messageProducer.close();
    messageProducer = null;
    connection = null;
    connected = false;
    System.out.println("!!!!!!!DISCONNECTED!!!!!!!!");
}

此后,我的应用程序挂起,就像是竞争条件.如果我关闭()仅生产者并将连接设置为null,则一切正常,但是,如果我尝试关闭消费者,则仅在N种情况下有效1次.我写的测试证明了这一点.我认为关闭消费者方面存在问题,但我没有发现我做错的任何信息.

After this my application hangs and it's like race conditions. If i close() only producer and set connection to null everything is OK, but if i try to close consumer it works only in 1 from N cases. I write test that proves it. I think the problem in closing consumers, but i didn't find any information what i do wrong.

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class FastFailProducer {
    volatile boolean connected = false;
    private ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?timeout=5000");;
    private static FailoverListener failoverListener;
    private Connection connection;
    private Session session;
    private Queue queue;
    private MessageProducer messageProducer;
    private MessageConsumer consumer;
    private String something;

public void init() throws JMSException {
    System.out.println("!!!!!!!CONNECTING!!!!!!!!");
    connection = factory.createConnection();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    connection.start();
    ((ActiveMQConnection) connection).addTransportListener(failoverListener);
    queue = session.createQueue("TEST");
    messageProducer = session.createProducer(queue);
    consumer = session.createConsumer(queue);
    System.out.println("!!!!!!!CONNECTING COMPLETE!!!!!!!!");
    connected = true;
}

public void disconnect() throws JMSException {
    System.out.println("!!!!!!!DISCONNECTING!!!!!!!!");
    consumer.close();
    session.close();
    session = null;
    messageProducer.close();
    messageProducer = null;
    connection = null;
    connected = false;
    System.out.println("!!!!!!!DISCONNECTED!!!!!!!!");
}

public void run() throws Exception {
    // send messages
    for (int i = 0; i < 1000; i++) {
        if (connected) {
            if (session != null & messageProducer != null & queue != null) {
                // send a message
                messageProducer.send(session.createTextMessage(i + " message"));
                System.out.println("Sent message " + i);
            }
        } else {
            // execute your backup logic
            System.out.println("Message " + i + " not sent");

        }
        Thread.sleep(1000);
    }

    messageProducer.close();
    session.close();
    connection.close();
    System.exit(0);
}

public static void main(String[] args) throws Exception {
    FastFailProducer failoverProducer = new FastFailProducer();
    failoverProducer.something = "asdfasdf";
    failoverListener = new FailoverListener(failoverProducer);
    failoverProducer.init();
    failoverProducer.setConnected(true);
    failoverProducer.run();

}

public boolean isConnected() {
    return connected;
}

public void setConnected(boolean connected) {
    this.connected = connected;
}
}

TransportListenerImpl类

TransportListenerImpl class

import java.io.IOException;

import javax.jms.JMSException;

import org.apache.activemq.transport.TransportListener;

public class FailoverListener implements TransportListener {
    private FastFailProducer failProducer;

public FailoverListener(FastFailProducer failProducer) {
    super();
    this.failProducer = failProducer;
}

@Override
public void onCommand(Object arg0) {
}

@Override
public void onException(IOException arg0) {

}

@Override
public void transportInterupted() {
    try {
        failProducer.disconnect();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}

@Override
public void transportResumed() {
    System.out.println("!!!!!!!TRANSPORT RESUMED!!!!!!!!");
    if (!failProducer.isConnected()) {
        try {
            failProducer.init();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
}

推荐答案

我认为您缺少使用故障转移协议的意义.如果使用故障转移,则无需关闭连接及其相关资源,因为故障转移传输将像还原代理之前那样,负责恢复客户端上的所有内容.在事件方法中关闭连接肯定会锁定,因为您不希望这样做.如果您想在代理消失时关闭所有内容,请不要使用故障转移,而应侦听JMS异常侦听器事件挂钩.

I think you are missing the point of using the failover protocol. If you use failover then there's no need to close down the connection and its associated resources as the failover transport will take care of restoring everything on the client end as it was before the broker went down. Closing the connection in the event method will definitely lock as you are not expected to do such a thing. If you want to shut everything down when the broker goes away, don't use failover, and instead listen on the JMS exception listener event hook instead.

这篇关于传输侦听器和ActiveMq重新启动的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆