如何在RabbitMQ中重新排队消息 [英] How to requeue messages in RabbitMQ
问题描述
使用者收到消息后,使用者/工作人员进行一些验证,然后调用Web服务.在此阶段,如果发生任何错误或验证失败,我们希望将消息放回到最初使用该消息的队列中.
After the consumer gets a message, consumer/worker does some validations and then call web service. In this phase, if any error occurs or validation fails, we want the message put back to the queue it was originally consumed from.
我已阅读RabbitMQ文档.但是我对拒绝,取消和取消方法之间的差异感到困惑.
I have read RabbitMQ documentation. But I am confused about differences between reject, nack and cancel methods.
推荐答案
简短答案:
要重新排队特定消息,您可以选择basic.reject
或basic.nack
并将multiple
标志设置为false.
To requeue specific message you can pick both basic.reject
or basic.nack
with multiple
flag set to false.
basic.consume
调用也可能导致重新发送消息.
basic.consume
calling may also results to messages redelivering if you are using message acknowledge and there are un-acknowledged message on consumer at specific time and consumer exit without ack-ing them.
basic.recover
将在特定频道上重新发送所有未确认的消息.
basic.recover
will redeliver all un-acked messages on specific channel.
详细答案:
basic.reject
和 basic.nack
都用于相同目的-删除或重新排队特定使用者无法处理的消息(在给定的时刻,在某些条件下或根本没有).它们之间的主要区别是basic.nack
支持批量消息处理,而basic.reject
不支持.
basic.reject
and basic.nack
both serves to same purpose - drop or requeue message that can't be handled by specific consumer (at the given moment, under certain conditions or at all). The main difference between them is that basic.nack
supports bulk messages processing, whilst basic.reject
doesn't.
RabbitMQ官方网站上的否定确认文章中描述了这种差异:
This difference described in Negative Acknowledgements article on official RabbitMQ web site:
AMQP规范定义了
basic.reject
方法,该方法允许客户端拒绝各个已传递的消息,指示代理放弃它们或将其重新排队.不幸的是,basic.reject
不支持批量否定确认消息.
The AMQP specification defines the
basic.reject
method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately,basic.reject
provides no support for negatively acknowledging messages in bulk.
为解决此问题,RabbitMQ支持basic.nack
方法,该方法提供basic.reject
的所有功能,而还允许批量处理消息.
To solve this, RabbitMQ supports the basic.nack
method that provides all the functionality of basic.reject
whilst also allowing for bulk processing of messages.
要批量拒绝邮件,客户端将basic.nack
方法的multiple
标志设置为true
.然后,代理将拒绝所有未确认的已传递消息,直到并包括basic.nack
方法的delivery_tag
字段中指定的消息.在这方面, basic.nack
是对 basic.ack
的批量确认语义./p>
To reject messages in bulk, clients set the multiple
flag of the basic.nack
method to true
. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag
field of the basic.nack
method. In this respect, basic.nack
complements the bulk acknowledgement semantics of basic.ack
.
请注意,basic.nack
方法是RabbitMQ特定的扩展,而basic.reject
方法是AMQP 0.9.1规范的一部分.
Note, that basic.nack
method is RabbitMQ-specific extension while basic.reject
method is part of AMQP 0.9.1 specification.
关于 basic.cancel
方法,它用于通知服务器客户端停止使用消息.请注意,客户端可能会在basic.cancel
方法之间发送发送cancel-ok
答复的任意消息号.如果客户端使用消息确认,并且有任何未确认的消息,它们将被移回到最初使用它们的队列中.
As to basic.cancel
method, it used to notify server that client stops message consuming. Note, that client may receive arbitrary messages number between basic.cancel
method sending an receiving the cancel-ok
reply. If message acknowledge is used by client and it has any un-acknowledged messages they will be moved back to the queue they originally was consumed from.
basic.recover
有一些限制在RabbitMQ中:它
- basic.recover with requeue = false
- basic.recover同步性
basic.recover
has some limitations in RabbitMQ: it
- basic.recover with requeue=false
- basic.recover synchronicity
除了勘误表之外,根据RabbitMQ规范, basic.recover
具有部分支持(通过不支持requeue = false.)
In addition to errata, according to RabbitMQ specs basic.recover
has partial support (Recovery with requeue=false is not supported.)
有关 basic.consume
:
basic.consume
开始时自动确认(noack=false
)并且有一些未确认的消息未确认,然后当使用者被取消(死亡,致命错误,异常等)时,将重新发送未决消息.从技术上讲,直到消费者释放它们(确认/否定/拒绝/恢复)后,该待处理的消息才被处理(甚至是死信).只有在此之后,它们才会被处理(例如,死信).
When basic.consume
started without auto-ack (noack=false
) and there are some pending messages non-acked messages, then when consumer get canceled (dies, fatal error, exception, whatever) that pending messages will be redelivered. Technically, that pending messages will not be processed (even dead-lettered) until consumer release them (ack/nack/reject/recover). Only after that they will be processed (e.g. deadlettered).
例如,假设我们连续发布了5条消息:
For example, let say we post originally 5 message in a row:
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
然后消费其中的3个,但不确认它们,然后取消消费.我们将遇到这种情况:
And then consume 3 of them, but not ack them, and then cancel consumer. We will have this situation:
Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)
其中星号(*
)指出redelivered
标志设置为true
.
where star (*
) notes that redelivered
flag set to true
.
假设我们遇到了死信交换设置并排队等待死信消息的情况
Assume that we have situation with dead-lettered exchange set and queue for dead-lettered messages
Exchange(e-main) Exchange(e-dead)
Queue(main){x-dead-letter-exchange: "e-dead"} Queue(dead)
并假设我们发布了5条消息,其expire
属性设置为5000
(5秒):
And assume we post 5 message with expire
property set to 5000
(5 sec):
Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)
,然后我们从main
队列中消耗3条消息,并保留10秒钟:
and then we consume 3 message from main
queue and hold them for 10 second:
Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)
,其中感叹号(!
)代表未确认的消息.此类消息无法传递给任何消费者,并且通常无法在管理面板中查看.但是,让我们取消消费者,请记住,它仍然保留3条未确认的消息:
where exclamation mark (!
) stands for unacked message. Such messages can't be delivered to any consumer and they normally can't be viewed in management panel. But let's cancel consumer, remember, that it still hold 3 un-acked message:
Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)
因此,现在头中的3条消息放回了原始队列,但是由于已设置了每个消息的TTL,它们便被死信化为死信队列的尾部(确保通过死信交换) ).
So now that 3 messages which was in the head put back to original queue, but as they has per-message TTL set, they are dead-lettered to the tail of dead-letter queue (sure, via dead-letter exchange).
PS:
消费消息(也就是侦听新消息)与直接队列访问有所不同(获取一条或多条消息而无需照顾其他人).有关更多信息,请参见 basic.get
方法说明.
Consuming message aka listening for new one is somehow different from direct queue access (getting one or more message without taking care of others). See basic.get
method description for more.
这篇关于如何在RabbitMQ中重新排队消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!