如何在RabbitMQ中重新排队消息 [英] How to requeue messages in RabbitMQ

查看:612
本文介绍了如何在RabbitMQ中重新排队消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

使用者收到消息后,使用者/工作人员进行一些验证,然后调用Web服务.在此阶段,如果发生任何错误或验证失败,我们希望将消息放回到最初使用该消息的队列中.

After the consumer gets a message, consumer/worker does some validations and then call web service. In this phase, if any error occurs or validation fails, we want the message put back to the queue it was originally consumed from.

我已阅读RabbitMQ文档.但是我对拒绝,取消和取消方法之间的差异感到困惑.

I have read RabbitMQ documentation. But I am confused about differences between reject, nack and cancel methods.

推荐答案

简短答案:

要重新排队特定消息,您可以选择basic.rejectbasic.nack并将multiple标志设置为false.

To requeue specific message you can pick both basic.reject or basic.nack with multiple flag set to false.

basic.consume调用也可能导致重新发送消息.

basic.consume calling may also results to messages redelivering if you are using message acknowledge and there are un-acknowledged message on consumer at specific time and consumer exit without ack-ing them.

basic.recover将在特定频道上重新发送所有未确认的消息.

basic.recover will redeliver all un-acked messages on specific channel.

详细答案:

basic.reject basic.nack 都用于相同目的-删除或重新排队特定使用者无法处理的消息(在给定的时刻,在某些条件下或根本没有).它们之间的主要区别是basic.nack支持批量消息处理,而basic.reject不支持.

basic.reject and basic.nack both serves to same purpose - drop or requeue message that can't be handled by specific consumer (at the given moment, under certain conditions or at all). The main difference between them is that basic.nack supports bulk messages processing, whilst basic.reject doesn't.

RabbitMQ官方网站上的否定确认文章中描述了这种差异:

This difference described in Negative Acknowledgements article on official RabbitMQ web site:

AMQP规范定义了basic.reject方法,该方法允许客户端拒绝各个已传递的消息,指示代理放弃它们或将其重新排队.不幸的是,basic.reject不支持批量否定确认消息.

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.

为解决此问题,RabbitMQ支持basic.nack方法,该方法提供basic.reject的所有功能,而还允许批量处理消息.

To solve this, RabbitMQ supports the basic.nack method that provides all the functionality of basic.reject whilst also allowing for bulk processing of messages.

要批量拒绝邮件,客户端将basic.nack方法的multiple标志设置为true.然后,代理将拒绝所有未确认的已传递消息,直到并包括basic.nack方法的delivery_tag字段中指定的消息.在这方面, basic.nack 是对 basic.ack 的批量确认语义./p>

To reject messages in bulk, clients set the multiple flag of the basic.nack method to true. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag field of the basic.nack method. In this respect, basic.nack complements the bulk acknowledgement semantics of basic.ack.

请注意,basic.nack方法是RabbitMQ特定的扩展,而basic.reject方法是AMQP 0.9.1规范的一部分.

Note, that basic.nack method is RabbitMQ-specific extension while basic.reject method is part of AMQP 0.9.1 specification.

关于 basic.cancel 方法,它用于通知服务器客户端停止使用消息.请注意,客户端可能会在basic.cancel方法之间发送发送cancel-ok答复的任意消息号.如果客户端使用消息确认,并且有任何未确认的消息,它们将被移回到最初使用它们的队列中.

As to basic.cancel method, it used to notify server that client stops message consuming. Note, that client may receive arbitrary messages number between basic.cancel method sending an receiving the cancel-ok reply. If message acknowledge is used by client and it has any un-acknowledged messages they will be moved back to the queue they originally was consumed from.

basic.recover 有一些限制在RabbitMQ中:它 - basic.recover with requeue = false - basic.recover同步性

basic.recover has some limitations in RabbitMQ: it - basic.recover with requeue=false - basic.recover synchronicity

除了勘误表之外,根据RabbitMQ规范, basic.recover具有部分支持(通过不支持requeue = false.)

In addition to errata, according to RabbitMQ specs basic.recover has partial support (Recovery with requeue=false is not supported.)

有关 basic.consume :

basic.consume 开始时自动确认(no­ack=false)并且有一些未确认的消息未确认,然后当使用者被取消(死亡,致命错误,异常等)时,将重新发送未决消息.从技术上讲,直到消费者释放它们(确认/否定/拒绝/恢复)后,该待处理的消息才被处理(甚至是死信).只有在此之后,它们才会被处理(例如,死信).

When basic.consume started without auto-ack (no­ack=false) and there are some pending messages non-acked messages, then when consumer get canceled (dies, fatal error, exception, whatever) that pending messages will be redelivered. Technically, that pending messages will not be processed (even dead-lettered) until consumer release them (ack/nack/reject/recover). Only after that they will be processed (e.g. deadlettered).

例如,假设我们连续发布了5条消息:

For example, let say we post originally 5 message in a row:

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)

然后消费其中的3个,但不确认它们,然后取消消费.我们将遇到这种情况:

And then consume 3 of them, but not ack them, and then cancel consumer. We will have this situation:

Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)

其中星号(*)指出redelivered标志设置为true.

where star (*) notes that redelivered flag set to true.

假设我们遇到了死信交换设置并排队等待死信消息的情况

Assume that we have situation with dead-lettered exchange set and queue for dead-lettered messages

Exchange(e-main)                                   Exchange(e-dead) 
  Queue(main){x-dead-letter-exchange: "e-dead"}       Queue(dead) 

并假设我们发布了5条消息,其expire属性设置为5000(5秒):

And assume we post 5 message with expire property set to 5000 (5 sec):

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)

,然后我们从main队列中消耗3条消息,并保留10秒钟:

and then we consume 3 message from main queue and hold them for 10 second:

Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)

,其中感叹号(!)代表未确认的消息.此类消息无法传递给任何消费者,并且通常无法在管理面板中查看.但是,让我们取消消费者,请记住,它仍然保留3条未确认的消息:

where exclamation mark (!) stands for unacked message. Such messages can't be delivered to any consumer and they normally can't be viewed in management panel. But let's cancel consumer, remember, that it still hold 3 un-acked message:

Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)

因此,现在头中的3条消息放回了原始队列,但是由于已设置了每个消息的TTL,它们便被死信化为死信队列的尾部(确保通过死信交换) ).

So now that 3 messages which was in the head put back to original queue, but as they has per-message TTL set, they are dead-lettered to the tail of dead-letter queue (sure, via dead-letter exchange).

PS:

消费消息(也就是侦听新消息)与直接队列访问有所不同(获取一条或多条消息而无需照顾其他人).有关更多信息,请参见 basic.get 方法说明.

Consuming message aka listening for new one is somehow different from direct queue access (getting one or more message without taking care of others). See basic.get method description for more.

这篇关于如何在RabbitMQ中重新排队消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆