使用RX Observable.Interval进行Polly重试 [英] Polly Retry with RX Observable.Interval

查看:102
本文介绍了使用RX Observable.Interval进行Polly重试的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Polly的新手,我正在尝试应用重试策略,以便在出现IBMMQ连接问题时可以手动处理重试连接.

I'm new to Polly and I'm trying to apply the Retry policy, so that I can have it manually handling the retry connection in case of IBMMQ connection issue.

请考虑以下代码:

 public class ReconnectException : Exception
{

}

public class QueueMonitor : IObservable<Message>, IDisposable
{
    private readonly MQQueue mqQueue;
    private readonly MQQueueManager queueManager;
    private readonly string queueName;
    private IDisposable timer;
    private readonly object lockObj = new object();
    private bool isChecking;
    private readonly TimeSpan checkingFrequency;
    private readonly List<IObserver<Message>> observers;
    private TimeSpan reconnectInterval;

    private readonly IScheduler scheduler;

    private readonly int maxReconnectCount;

    private static readonly ILog Logger = LogProvider.For<AonQueueManager>();


    private readonly Policy pollyPolicy;

    public QueueMonitor(IConfiguration configuration, string queueName, IScheduler scheduler = null)
    {
        this.queueManager = QueueFactory.GetIstance(configuration);
        this.queueName = queueName;
        this.scheduler = scheduler ?? Scheduler.Default;
        checkingFrequency = configuration.GetValue("checkingFrequency", new TimeSpan(0, 0, 5));
        reconnectInterval = configuration.GetValue("reconnectInterval", new TimeSpan(0, 0, 5));
        maxReconnectCount = configuration.GetValue("maxReconnectCount", 3);
        observers = new List<IObserver<Message>>();

        pollyPolicy = Policy.Handle<ReconnectException>().WaitAndRetry(maxReconnectCount, _ => TimeSpan.FromSeconds(2));

        mqQueue = queueManager.AccessQueue(queueName,
            MQC.MQOO_INPUT_AS_Q_DEF // open queue for input
            + MQC.MQOO_FAIL_IF_QUIESCING); // but not if MQM stopping

    }

    public void Start()
    {
        var x = pollyPolicy.ExecuteAndCapture(CreateTimer);
    }

    private void CreateTimer()
    {

        Logger.DebugFormat("Repeating timer started, checking frequency: {checkingFrequency}", checkingFrequency);
        timer = Observable.Interval(checkingFrequency, scheduler).Subscribe(_ =>
 {
   lock (lockObj)
   {
     if (isChecking) return;

     Logger.Log(LogLevel.Debug, () => "Listening on queues for new messages");
     isChecking = true;

     var mqMsg = new MQMessage();
     var mqGetMsgOpts = new MQGetMessageOptions { WaitInterval = checkingFrequency.Milliseconds };

     // 15 second limit for waiting
     mqGetMsgOpts.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING |
                  MQC.MQCNO_RECONNECT_Q_MGR | MQC.MQOO_INPUT_AS_Q_DEF;
     try
     {
         mqQueue.Get(mqMsg, mqGetMsgOpts);
         if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
         {
             var text = mqMsg.ReadString(mqMsg.MessageLength);

             Logger.Debug($"Message received : [{text}]");

             Message message = new Message { Content = text };
             foreach (var observer in observers)
                 observer.OnNext(message);
         }
         else
         {
             Logger.Warn("Non-text message");
         }
     }
     catch (MQException ex)
     {
         if (ex.Message == MQC.MQRC_NO_MSG_AVAILABLE.ToString())
         {
             Logger.Trace("No messages available");
             //nothing to do, emtpy queue
         }
         else if (ex.Message == MQC.MQRC_CONNECTION_BROKEN.ToString())
         {
             Logger.ErrorException("MQ Exception, trying to recconect", ex);

             throw new ReconnectException();
         }
     }
     finally
     {
         isChecking = false;
     }
 }
});
    }


    public IDisposable Subscribe(IObserver<Message> observer)
    {
        if (!observers.Contains(observer))
            observers.Add(observer);

        return new Unsubscriber(observers, observer);
    }

    public void Dispose()
    {
        ((IDisposable)mqQueue)?.Dispose();
        ((IDisposable)queueManager)?.Dispose();

        timer?.Dispose();
    }
}

public class Unsubscriber : IDisposable
{
    private readonly List<IObserver<Message>> observers;
    private readonly IObserver<Message> observer;

    public Unsubscriber(List<IObserver<Message>> observers, IObserver<Message> observer)
    {
        this.observers = observers;
        this.observer = observer;
    }

    public void Dispose()
    {
        if (observer != null) observers.Remove(observer);
    }
}

我遇到的问题是,当在lamda(throw new ReconnectException();)中引发异常时,Polly不会捕获该异常(并且我理解为什么,因为它在另一个线程上)并且应用程序由于在一个线程上而退出了.不同的线程.

The problem I've is that when an exception is thrown inside the lamda ( throw new ReconnectException();), Polly doesn't catch it (and I understand why, since it's on another thread) and the application quits since it's on a different thread.

此代码是库的一部分,所以我不知道是否在每个项目中都正确处理了Global异常.

This code is a part of a library,so I don't know that if in every project the Global exceptions are correctly handed.

如何让它被Polly的代码抓住"?

How do I get it "catched" by the Polly's code?

预先感谢

推荐答案

问题中发布的代码仅将策略应用于创建计时器的行为(执行CreateTimer()),而不应用于执行的代码通过计时器(.(Subscribe(_ => { })调用内的lambda).

The code posted in the question applies the policy only to the act of creating the timer (the execution of CreateTimer()), not to the code executed by the timer (the lambda inside the .(Subscribe(_ => { }) call).

CreateTimer()的调用被try { } catch { }包围时的行为相同. catch仅涵盖执行CreateTimer()方法的行为,即创建计时器.

This is the same as the behaviour if the call to CreateTimer() was surrounded by a try { } catch { }. The catch would only cover the act of executing the CreateTimer() method, the creation of the timer.

要让Polly策略管理在lambda中引发的异常,需要在lambda中将其应用于预期引发异常的相关语句块/组.

For the Polly policy to govern exceptions thrown within the lambda, it needs to be applied within the lambda, to the relevant block/group of statements which are expected to throw the exception.

例如,您可以编写代码:

For example, you might code:

pollyPolicy.ExecuteAndCapture(() => mqQueue.Get(mqMsg, mqGetMsgOpts));

(具有配置为管理您要处理的特定MQException/s的策略).

(with a policy configured to govern the particular MQException/s you want to handle).

或者您可以将该策略应用于更广泛的语句组,就像使用try { }子句一样.

Or you can apply the policy to a wider group of statements - just as with a try { } clause.

pollyPolicy.ExecuteAndCapture(() => 
{
    // ...
    mqQueue.Get(mqMsg, mqGetMsgOpts));
    // ...
}

这篇关于使用RX Observable.Interval进行Polly重试的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆