Rabbit MQ unack 消息未返回队列供消费者再次处理 [英] Rabbit MQ unack message not back to queue for consumer to process again

查看:68
本文介绍了Rabbit MQ unack 消息未返回队列供消费者再次处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用 RabbitMQ 作为我的队列消息服务器,我使用 .NET C# 客户端.当处理来自队列的消息出现错误时,消息不会确认并仍然卡在队列中,不会像我理解的文档那样再次处理.

I use RabbitMQ as my queue message server, I use .NET C# client. When there is error in processing message from queue, message will not ackknowleage and still stuck in queue not be processed again as the document I understand.

我不知道我是否遗漏了一些配置或代码块.

I don't know if I miss some configurations or block of codes.

我现在的想法是自动手动确认消息,如果错误并手动将此消息再次推送到队列.

My idea now is auto manual ack the message if error and manual push this message to queue again.

我希望有另一个更好的解决方案.

I hope to have another better solution.

非常感谢.

我的代码

        public void Subscribe(string queueName)
    {
        while (!Cancelled)
        {
            try
            {
                if (subscription == null)
                {
                    try
                    {
                        //try to open connection
                        connection = connectionFactory.CreateConnection();
                    }
                    catch (BrokerUnreachableException ex)
                    {
                        //You probably want to log the error and cancel after N tries, 
                        //otherwise start the loop over to try to connect again after a second or so.
                        log.Error(ex);
                        continue;
                    }

                    //crate chanel
                    channel = connection.CreateModel();
                    // This instructs the channel not to prefetch more than one message
                    channel.BasicQos(0, 1, false);
                    // Create a new, durable exchange
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
                    // Create a new, durable queue
                    channel.QueueDeclare(queueName, true, false, false, null);
                    // Bind the queue to the exchange
                    channel.QueueBind(queueName, exchangeName, queueName);
                    //create subscription
                    subscription = new Subscription(channel, queueName, false);
                }

                BasicDeliverEventArgs eventArgs;
                var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
                if (gotMessage)
                {
                    if (eventArgs == null)
                    {
                        //This means the connection is closed.
                        DisposeAllConnectionObjects();
                        continue;//move to new iterate
                    }

                    //process message

                   channel.BasicAck(eventArgs.DeliveryTag, false);


                }
            }
            catch (OperationInterruptedException ex)
            {
                log.Error(ex);
                DisposeAllConnectionObjects();
            }
        }

        DisposeAllConnectionObjects();
    }

    private void DisposeAllConnectionObjects()
    {
        //dispose subscription
        if (subscription != null)
        {
            //IDisposable is implemented explicitly for some reason.
            ((IDisposable)subscription).Dispose();
            subscription = null;
        }

        //dipose channel
        if (channel != null)
        {
            channel.Dispose();
            channel = null;
        }

        //check if connection is not null and dispose it
        if (connection != null)
        {
            try
            {
                connection.Dispose();
            }
            catch (EndOfStreamException ex)
            {
                log.Error(ex);
            }
            catch (OperationInterruptedException ex)//handle this get error from dispose connection 
            {
                log.Error(ex);
            }
            catch (Exception ex)
            {
                log.Error(ex);
            }
            connection = null;
        }
    }

推荐答案

我想你可能误解了 RabbitMQ 文档.如果消息没有得到消费者的确认,Rabbit broker 会将消息重新排入队列以供消费.我不相信您建议的确认然后重新排队消息的方法是一个好主意,只会使问题变得更加复杂.

I think you may have misunderstood the RabbitMQ documentation. If a message does not get ack'ed from the consumer, the Rabbit broker will requeue the message onto the queue for consumption. I don't believe your suggested method for ack'ing and then requeuing a message is a good idea, and will just make the problem more complex.

如果您想明确拒绝"一条消息,因为消费者在处理它时遇到了问题,您可以使用 Rabbit 的 Nack 功能.

If you want to explicitly "reject" a message because the consumer had a problem processing it, you could use the Nack feature of Rabbit.

例如,在您的 catch 异常块中,您可以使用:

For example, within your catch exception blocks, you could use:

subscription.Model.BasicNack(eventArgs.DeliveryTag, false, true);

这将通知 Rabbit broker 重新排队消息.基本上,您传递传递标签,false 表示它不是多条消息,true 表示重新排队消息.如果您想拒绝消息而不是重新排队,只需将 true 更改为 false.

This will inform the Rabbit broker to requeue the message. Basically, you pass the delivery tag, false to say it is not multiple messages, and true to requeue the message. If you want to reject the message and NOT requeue, just change true to false.

此外,您已经创建了订阅,所以我认为您应该直接对此执行确认,而不是通过频道.

Additionally, you have created a subscription, so I think you should perform your ack's directly on this, not through the channel.

变化:

channel.BasicAck(eventArgs.DeliveryTag, false);

致:

subscription.Ack(); 

这种确认方法更简洁,因为您将所有与订阅相关的内容都保留在订阅对象上,而不是弄乱您已经订阅的频道.

This method of ack'ing is much cleaner since you are then keeping everything subscription-related on the subscription object, rather than messing around with the channel that you've already subscribed to.

这篇关于Rabbit MQ unack 消息未返回队列供消费者再次处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆