如何在 RabbitMQ 中重新排队消息 [英] How to requeue messages in RabbitMQ

查看:42
本文介绍了如何在 RabbitMQ 中重新排队消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

消费者收到消息后,消费者/工人做一些验证,然后调用网络服务.在此阶段,如果发生任何错误或验证失败,我们希望将消息放回最初消费的队列.

After the consumer gets a message, consumer/worker does some validations and then call web service. In this phase, if any error occurs or validation fails, we want the message put back to the queue it was originally consumed from.

我已阅读 RabbitMQ 文档.但是我对reject、nack和cancel方法之间的区别感到困惑.

I have read RabbitMQ documentation. But I am confused about differences between reject, nack and cancel methods.

推荐答案

简短回答:

要重新排队特定消息,您可以选择 basic.rejectbasic.nack 并将 multiple 标志设置为 false.

To requeue specific message you can pick both basic.reject or basic.nack with multiple flag set to false.

basic.consume 调用也可能导致消息重新传递,如果您使用消息确认并且在特定时间消费者上有未确认的消息并且消费者在没有确认它们的情况下退出.

basic.consume calling may also results to messages redelivering if you are using message acknowledge and there are un-acknowledged message on consumer at specific time and consumer exit without ack-ing them.

basic.recover 将重新发送特定频道上所有未确认的消息.

basic.recover will redeliver all un-acked messages on specific channel.

长答案:

basic.rejectbasic.nack 都用于相同的目的 - 删除或重新排队无法由特定消费者处理的消息(在给定时刻,在某些条件下或根本无法处理).它们之间的主要区别在于 basic.nack 支持批量消息处理,而 basic.reject 不支持.

basic.reject and basic.nack both serves to same purpose - drop or requeue message that can't be handled by specific consumer (at the given moment, under certain conditions or at all). The main difference between them is that basic.nack supports bulk messages processing, whilst basic.reject doesn't.

官方 RabbitMQ 网站上的否定确认文章中描述了这种差异:

This difference described in Negative Acknowledgements article on official RabbitMQ web site:

AMQP 规范定义了 basic.reject 方法,该方法允许客户端拒绝单独的、已传递的消息,指示代理丢弃它们或重新排队.不幸的是,basic.reject 不支持批量否定确认消息.

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them. Unfortunately, basic.reject provides no support for negatively acknowledging messages in bulk.

为了解决这个问题,RabbitMQ 支持 basic.nack 方法,该方法提供了 basic.reject 的所有功能,同时还允许批量处理消息强>.

To solve this, RabbitMQ supports the basic.nack method that provides all the functionality of basic.reject whilst also allowing for bulk processing of messages.

要批量拒绝消息,客户端将 basic.nack 方法的 multiple 标志设置为 true.然后,代理将拒绝所有未确认的、已传递的消息,直到并包括在 basic.nack 方法的 delivery_tag 字段中指定的消息.在这方面,basic.nack 补充了 basic.ack.

To reject messages in bulk, clients set the multiple flag of the basic.nack method to true. The broker will then reject all unacknowledged, delivered messages up to and including the message specified in the delivery_tag field of the basic.nack method. In this respect, basic.nack complements the bulk acknowledgement semantics of basic.ack.

请注意,basic.nack 方法是 RabbitMQ 特定的扩展,而 basic.reject 方法是 AMQP 0.9.1 规范的一部分.

Note, that basic.nack method is RabbitMQ-specific extension while basic.reject method is part of AMQP 0.9.1 specification.

至于 basic.cancel 方法,用于通知服务器客户端停止消息消费.请注意,客户端可能会在 basic.cancel 方法发送接收 cancel-ok 回复之间接收任意消息编号.如果客户端使用消息确认并且它有任何未确认的消息,它们将被移回它们最初被消费的队列.

As to basic.cancel method, it used to notify server that client stops message consuming. Note, that client may receive arbitrary messages number between basic.cancel method sending an receiving the cancel-ok reply. If message acknowledge is used by client and it has any un-acknowledged messages they will be moved back to the queue they originally was consumed from.

basic.recover 在 RabbitMQ 中有一些限制:- basic.recover with requeue=false- 基本.恢复同步性

除了勘误表,根据 RabbitMQ 规范 basic.recover 有部分支持(不支持 requeue=false 的恢复.)

In addition to errata, according to RabbitMQ specs basic.recover has partial support (Recovery with requeue=false is not supported.)

注意基本.消费:

basic.consume 开始时没有自动确认 (noack=false) 并且有一些未确认的待处理消息,然后当消费者被取消(死亡、致命错误、异常等)时,该待处理消息消息将被重新发送.从技术上讲,在消费者释放它们(ack/nack/reject/recover)之前,不会处理待处理的消息(甚至是死信).只有在此之后它们才会被处理(例如,deadlettered).

When basic.consume started without auto-ack (no­ack=false) and there are some pending messages non-acked messages, then when consumer get canceled (dies, fatal error, exception, whatever) that pending messages will be redelivered. Technically, that pending messages will not be processed (even dead-lettered) until consumer release them (ack/nack/reject/recover). Only after that they will be processed (e.g. deadlettered).

例如,假设我们最初连续发布 5 条消息:

For example, let say we post originally 5 message in a row:

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)

然后消费其中的3个,但不确认它们,然后取消消费.我们会有这样的情况:

And then consume 3 of them, but not ack them, and then cancel consumer. We will have this situation:

Queue(main) (tail) { [4] [3] [2*] [1*] [0*] } (head)

其中 star (*) 表示 redelivered 标志设置为 true.

where star (*) notes that redelivered flag set to true.

假设我们有死信交换集和死信消息队列的情况

Assume that we have situation with dead-lettered exchange set and queue for dead-lettered messages

Exchange(e-main)                                   Exchange(e-dead) 
  Queue(main){x-dead-letter-exchange: "e-dead"}       Queue(dead) 

假设我们发布了 5 条消息,expire 属性设置为 5000(5 秒):

And assume we post 5 message with expire property set to 5000 (5 sec):

Queue(main) (tail) { [4] [3] [2] [1] [0] } (head)
Queue(dead) (tail) { }(head)

然后我们从 main 队列中消费 3 条消息并保持它们 10 秒:

and then we consume 3 message from main queue and hold them for 10 second:

Queue(main) (tail) { [2!] [1!] [0!] } (head)
Queue(dead) (tail) { [4*] [3*] } (head)

其中感叹号 (!) 代表未确认的消息.此类消息无法传递给任何消费者,并且通常无法在管理面板中查看.但是让我们取消消费者,记住,它仍然持有 3 条未确认的消息:

where exclamation mark (!) stands for unacked message. Such messages can't be delivered to any consumer and they normally can't be viewed in management panel. But let's cancel consumer, remember, that it still hold 3 un-acked message:

Queue(main) (tail) { } (head)
Queue(dead) (tail) { [2*] [1*] [0*] [4*] [3*] } (head)

所以现在头部中的 3 条消息放回原始队列,但是由于它们设置了每条消息的 TTL,它们被死信发送到死信队列的尾部(当然,通过死信交换).

So now that 3 messages which was in the head put back to original queue, but as they has per-message TTL set, they are dead-lettered to the tail of dead-letter queue (sure, via dead-letter exchange).

附注:

消费消息又名侦听新消息与直接队列访问(获取一条或多条消息而不关心其他消息)在某种程度上有所不同.请参阅basic.get 更多方法说明.

Consuming message aka listening for new one is somehow different from direct queue access (getting one or more message without taking care of others). See basic.get method description for more.

这篇关于如何在 RabbitMQ 中重新排队消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆