连接失败时重新连接到IBM MQ Queue [英] Reconnecting to IBM MQ Queue on connection failure

查看:140
本文介绍了连接失败时重新连接到IBM MQ Queue的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下代码段具有我对IBM MQ队列的连接和订阅逻辑.每当出现连接故障时,我都会使用IConnection.ExceptionListener委托通过队列建立新的连接并重新订阅消息.但是问题是,我可以看到多个队列句柄.如何确保我关闭了先前的连接句柄并在由于网络问题或MQ服务器重启而导致连接中断的地方建立了新连接?

Following code snippet has my connection and subscription logic for an IBM MQ Queue. When ever there is a connection failure, I am using IConnection.ExceptionListener delegate to establish a new connection to by queue and resubscribing for the messages. But the problem is, I can see multiple queue handles. How can I make sure I close the previous connection handle and establish a new connection where ever there is a connection break due to network issues or MQ server restarts?

private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;

private void CreateWebsphereQueueConnection () {
    SetConnectionFactory ();

    //Connection
    _connection = _connectionfactory.CreateConnection (null, null);
    _connection.ExceptionListener = new ExceptionListener (OnConnectionException);

    //Session
    _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);

    //Destination
    _destination = _session.CreateQueue ("queue://My.Queue.Name");
    _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
    _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);

    //Consumer
    _consumer = _session.CreateConsumer (_destination);
}

private IConnectionFactory SetConnectionFactory () {
    XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
    IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();

    // Set the properties
    cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
    cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
    cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
    cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
    cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);

    cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
    cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
    return cf;
}

public override void Subscribe<T> (Action<T> onMessageReceived) {
    try {

        _connection.ExceptionListener = delegate (Exception connectionException) {
            //Using any of these two statements is termination my code. Debugger doesn't move to CreateWebsphereQueueConnection() line of code at all
            //_conection.Stop()
            //_conection.Close()
            CreateWebsphereQueueConnection ();
            Subscribe (onMessageReceived);
        };

        MessageListener messageListener = new MessageListener ((msg) => {
            onMessageReceived (message);
        });
        _consumer.MessageListener = messageListener;

        // Start the connection
        _connection.Start ();
    } catch (Exception ex) {
        //Log exception details
    }
}

推荐答案

IBM.XMS.dll将负责MQ故障转移或使用-r开关重新启动.但是,如果出现重新启动请求连接的客户端重新连接的情况,XMS库将不会尝试重新连接,并且客户将必须手动处理这种情况,如@Shashi和@JoshMc所指出的那样.

IBM.XMS.dll will take care of MQ fail over or restarts done with -r switch. But, if there was a restart wit out asking the connected clients to reconnect, XMS library will not attempt to reconnect and the costumers will have to handle this situation manually as pointed out by @Shashi and @JoshMc.

我不得不处理这种情况,并按如下方式更改我的Connection ExceptionListener对我有帮助:

I had to handle this situation and changing my Connection ExceptionListener as follows helped me:

private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;
private bool _reConnectOnConnectionBreak = false;
private bool _connected = false;
private void CreateWebsphereQueueConnection () {
    SetConnectionFactory ();

    while (!_connected || _reConnectOnConnectionBreak) {
        try {
            //Connection
            _connection = _connectionfactory.CreateConnection (null, null);
            _connection.ExceptionListener = new ExceptionListener (OnConnectionException);

            //Session
            _session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);

            //Destination
            _destination = _session.CreateQueue ("queue://My.Queue.Name");
            _destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
            _destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);

            //Consumer
            _consumer = _session.CreateConsumer (_destination);
            _connected = true;
        } catch (Exception ex) {
            _connected = false;
        }

    }
}

private IConnectionFactory SetConnectionFactory () {
    XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
    IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();

    // Set the properties
    cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
    cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
    cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
    cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
    cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
    cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);

    cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
    cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
    return cf;
}

public override void Subscribe<T> (Action<T> onMessageReceived) {
    try {

        _connection.ExceptionListener = delegate (Exception connectionException) {
            XMSException xmsError = (XMSException) connectionException;
            int reasonCode = ((IBM.WMQ.MQException) (xmsError).LinkedException).ReasonCode;
            if (reasonCode == MQC.MQRC_Q_MGR_QUIESCING || reasonCode == MQC.MQRC_CONNECTION_BROKEN) {
                _reConnectOnConnectionBreak = true;
                _connection.Close ();

                CreateWebsphereQueueConnection ();
                Subscribe (onMessageReceived);
                _reConnectOnConnectionBreak = false;
            }
        }

        MessageListener messageListener = new MessageListener ((msg) => {
            onMessageReceived (message);
        });
        _consumer.MessageListener = messageListener;

        // Start the connection
        _connection.Start ();
    } catch (Exception ex) {
        //Log exception details
    }
}

在IBM MQ版本8中没有更好的方法来检查连接IConnection的状态.因此,我不得不使用原因码.在IBM MQ版本9中,我们可以使用服务器公开的其余API来检查连接状态.

There is no better way to check the state of the connection IConnection in IBM MQ version 8. So, I had to use thereason codes. In IBM MQ version 9, we can use the rest API exposed by the server to check the connection state.

这篇关于连接失败时重新连接到IBM MQ Queue的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆