异常时重新排队消息 [英] Re-queue message on exception

查看:250
本文介绍了异常时重新排队消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在寻找一种可靠的方式来重新排列无法正确处理的消息-目前.

I'm looking for a solid way of re-queuing messages that couldn't be handled properly - at this time.

我一直在查看

I've been looking at http://dotnetcodr.com/2014/06/16/rabbitmq-in-net-c-basic-error-handling-in-receiver/ and it seems that it's supported to requeue messages in the RabbitMQ API.

else //reject the message but push back to queue for later re-try
{
    Console.WriteLine("Rejecting message and putting it back to the queue: {0}", message);
    model.BasicReject(deliveryArguments.DeliveryTag, true);
}

但是我正在使用EasyNetQ. 所以想知道在这里我会怎么做类似的事情.

However I'm using EasyNetQ. So wondering how I would do something similar here.

bus.Subscribe<MyMessage>("my_subscription_id", msg => {
    try
    {
        // do work... could be long running
    }
    catch ()
    {
        // something went wrong - requeue message
    }
});

这甚至是个好方法吗?如果do work超出RabbitMQ服务器等待ACK超时的时间,则消息不是ACK可能会导致问题.

Is this even a good approach? Not ACK the message could cause problems if do work exceeds the wait for ACK timeout by the RabbitMQ server.

推荐答案

所以我想到了这个解决方案.它将用EasyNetQ替换默认的错误策略.

So I came up with this solution. Which replaces the default error strategy by EasyNetQ.

public class DeadLetterStrategy : DefaultConsumerErrorStrategy
{
    public DeadLetterStrategy(IConnectionFactory connectionFactory, ISerializer serializer, IEasyNetQLogger logger, IConventions conventions, ITypeNameSerializer typeNameSerializer)
    : base(connectionFactory, serializer, logger, conventions, typeNameSerializer)
    {
    }

    public override AckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception)
    {
        object deathHeaderObject;
        if (!context.Properties.Headers.TryGetValue("x-death", out deathHeaderObject))
            return AckStrategies.NackWithoutRequeue;

        var deathHeaders = deathHeaderObject as IList;

        if (deathHeaders == null)
            return AckStrategies.NackWithoutRequeue;

        var retries = 0;
        foreach (IDictionary header in deathHeaders)
        {
            var count = int.Parse(header["count"].ToString());
            retries += count;
        }

        if (retries < 3)
            return AckStrategies.NackWithoutRequeue;
        return base.HandleConsumerError(context, exception);
    }
}

您可以这样替换它:

RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>())

您必须使用AdvancedBus,因此必须手动设置所有内容.

You have to use the AdvancedBus so you have to setup everything up manually.

using (var bus = RabbitHutch.CreateBus("host=localhost", serviceRegister => serviceRegister.Register<IConsumerErrorStrategy, DeadLetterStrategy>()))
{
    var deadExchange = bus.Advanced.ExchangeDeclare("exchange.text.dead", ExchangeType.Direct);
    var textExchange = bus.Advanced.ExchangeDeclare("exchange.text", ExchangeType.Direct);
    var queue = bus.Advanced.QueueDeclare("queue.text", deadLetterExchange: deadExchange.Name);
    bus.Advanced.Bind(deadExchange, queue, "");
    bus.Advanced.Bind(textExchange, queue, "");

    bus.Advanced.Consume<TextMessage>(queue, (message, info) => HandleTextMessage(message, info));
}

这将使失败的消息死信3次.之后,它将转到EasyNetQ提供的默认错误队列以进行错误处理. 您可以订阅该队列.

This will dead letter a failed message 3 times. After that it'll go to the default error queue provided by EasyNetQ for error handling. You can subscribe to that queue.

当异常从您的使用者方法中传播出去时,一条消息用字母标上.因此,这将引发一封死信.

A message is dead lettered when an exception propagates out of your consumer method. So this would trigger a dead letter.

static void HandleTextMessage(IMessage<TextMessage> textMessage, MessageReceivedInfo info)
{
    throw new Exception("This is a test!");
}

这篇关于异常时重新排队消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆