EasyNetQ-如何重试失败的消息在邮件正文/标题中保留RetryCount吗? [英] EasyNetQ - How to retry failed messages & persist RetryCount in message body/header?

查看:157
本文介绍了EasyNetQ-如何重试失败的消息在邮件正文/标题中保留RetryCount吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用EasyNetQ,需要重试原始队列中的失败消息.问题是:即使我成功地增加了TriedCount变量(在每个msg的正文中),当EasyNetQ在异常发生后将消息发布到默认错误队列时,更新的TriedCount也不在msg中!大概是因为它只是将原始消息转储到错误队列,而没有使用方的更改.

I am using EasyNetQ and need to retry failed messages on the original queue. The problem is: even though I successfully increment the TriedCount variable (in the body of every msg), when EasyNetQ publishes the message to the default error queue after an exception, the updated TriedCount is not in the msg! Presumably because it just dumps the original message to the error queue without the consumer's changes.

更新的TriedCount适用于进程内重新发布,但不适用于通过EasyNetQ Hosepipe或EasyNetQ Management Client重新发布的情况. Hosepipe 生成的文本文件没有更新TriedCount.

The updated TriedCount works for in-process republishes, but not when republished through EasyNetQ Hosepipe or EasyNetQ Management Client. The text files Hosepipe generates do not have the TriedCount updated.

public interface IMsgHandler<T> where T: class, IMessageType
{
    Task InvokeMsgCallbackFunc(T msg);
    Func<T, Task> MsgCallbackFunc { get; set; }
    bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only 
                                                      // if Retry is valid
}

public interface IMessageType
{
    int MsgTypeId { get; }

    Dictionary<string, TryInfo> MsgTryInfo {get; set;}

}

public class TryInfo
{   
    public int TriedCount { get; set; }

    /*Other information regarding msg attempt*/
}

public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
    IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
    // Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
    // The mediator can transmit the retried msg or choose to ignore it
    return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}

我还尝试通过管理API 重新发布自己(粗糙代码):

I have also tried republishing myself through the Management API (rough code):

var client = new ManagementClient("http://localhost", "guest", "guest");
var vhost = client.GetVhostAsync("/").Result;
var errQueue = client.GetQueueAsync("EasyNetQ_Default_Error_Queue", 
vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, 
Ackmodes.ack_requeue_true);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, 
crit).Result;

foreach (var errMsg in errMsgs)
{
    var pubRes = client.PublishAsync(client.GetExchangeAsync(errMsg.Exchange, vhost).Result,
                                 new PublishInfo(errMsg.RoutingKey, errMsg.Payload)).Result;
        }

这有效,但仅再次发布到错误队列,而不发布在原始队列上.另外,在此阶段,我不知道如何在邮件正文中添加/更新重试信息.

This works but only publishes to the error queue again, not on the original queue. Also, I don't know how to add/update the retry information in the body of the message at this stage.

我已经浏览了库,以便将标头添加到消息中,但是我没有看看正文中的计数是否没有更新,标题中的计数如何/为什么更新.

I have explored this library to add headers to the message but I don't see if the count in the body is not being updated, how/why would the count in the header be updated.

是否有任何方法可以在不依靠Advanced总线的情况下持久保存TriedCount(在这种情况下,我可能会使用RabbitMQ .Net客户端本身)?

Is there any way to persist the TriedCount without resorting to the Advanced bus (in which case I might use the RabbitMQ .Net client itself)?

推荐答案

以防万一,我最终实现了自己的

Just in case it helps someone else, I eventually implemented my own IErrorMessageSerializer (as opposed to implementing the whole IConsumerErrorStrategy, which seemed like an overkill). The reason I am adding the retry info in the body (instead of the header) is that EasyNetQ doesn't handle complex types in the header (not out-of-the-box anyway). So, using a dictionary gives more control for different consumers. I register the custom serializer at the time of creating the bus like so:

_defaultBus = RabbitHutch.CreateBus(currentConnString, serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId)));

并像下面这样实现了Serialize方法:

And just implemented the Serialize method like so:

 public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
 {
        public string Serialize(byte[] messageBody)
        {
             string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
             var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);

             // Add/update RetryInformation into objectifiedMsgBody here
             // I have a dictionary that saves <key:consumerId, val: TryInfoObj>

             return JsonConvert.SerializeObject(objectifiedMsgBody);
        }
  }

实际的重试由简单的控制台应用程序/windows服务通过EasyNetQ Management API定期进行:

The actual retrying is done by a simple console app/windows service periodically via the EasyNetQ Management API:

            var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
            var vhost = client.GetVhostAsync("/").Result;
            var aliveRes = client.IsAliveAsync(vhost).Result;
            var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
            var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
            var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
            foreach (var errMsg in errMsgs)
            {
                var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
                var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
                pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
                pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
                pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
                var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result,
                     pubInfo).Result;
            }

我的使用者本身是否知道是否启用了重试功能,从而赋予了它更多的控制权,因此它可以选择处理重试的msg或忽略它.一旦被忽略,则味精显然将不会再被尝试.这就是EasyNetQ的工作方式.

Whether retry is enabled or not is known by my consumer itself, giving it more control so it can choose to handle the retried msg or just ignore it. Once ignored, the msg will obviously not be tried again; that's how EasyNetQ works.

这篇关于EasyNetQ-如何重试失败的消息在邮件正文/标题中保留RetryCount吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆