如何有选择地从 AMQP (RabbitMQ) 队列中删除消息? [英] How to selectively delete messages from an AMQP (RabbitMQ) queue?

查看:44
本文介绍了如何有选择地从 AMQP (RabbitMQ) 队列中删除消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想有选择地从 AMQP 队列中删除消息,甚至不读取它们.

I'd like to selectively delete messages from an AMQP queue without even reading them.

场景如下:

发送方希望基于 X 类型的新信息到达的事实使 X 类型的消息过期.因为订阅者很可能还没有消费最新的 X 类型消息,发布者应该删除以前的 X 类型消息并将最新的消息放入队列中.整个操作应该对订阅者透明——事实上他应该使用像 STOMP 这样简单的东西来获取消息.

Sending side wants to expire messages of type X based on a fact that new information of type X arrived. Because it's very probable that the subscriber didn't consume latest message of type X yet, publisher should just delete previous X-type messages and put a newest one into the queue. The whole operation should be transparent to the subscriber - in fact he should use something as simple as STOMP to get the messages.

如何使用 AMQP 做到这一点?或者也许在另一种消息传递协议中更方便?

How to do it using AMQP? Or maybe it's more convenient in another messaging protocol?

我想避免使用复杂的基础架构.所需的整个消息传递与上述一样简单:一个队列、一个订阅者、一个发布者,但发布者必须能够根据给定条件临时删除消息.

I'd like to avoid a complicated infrastructure. The whole messaging needed is as simple as above: one queue, one subscriber, one publisher, but the publisher must have an ability to ad-hoc deleting the messages for a given criteria.

发布者客户端将使用 Ruby,但实际上,一旦我发现如何在协议中执行此操作,我就会处理任何语言.

The publisher client will use Ruby but actually I'd deal with any language as soon as I discover how to do it in the protocol.

推荐答案

您目前无法在 RabbitMQ(或更一般地,在 AMQP)中自动执行此操作.但是,这里有一个简单的解决方法.

You cannot currently do this in RabbitMQ (or more generally, in AMQP) automatically. But, here's an easy workaround.

假设您要发送三种类型的消息:Xs、Ys 和 Zs.如果我正确理解您的问题,当 X 消息到达时,您希望代理忘记所有其他尚未发送的 X 消息.

Let's say you want to send three types of messages: Xs, Ys and Zs. If I understand your question correctly, when an X message arrives, you want the broker to forget all other X messages that haven't been delivered.

这在 RabbitMQ 中很容易做到:

This is fairly easy to do in RabbitMQ:

  • 生产者声明了三个队列:X、Y 和 Z(它们会自动绑定到默认交换器,它们的名称作为路由键,这正是我们想要的),
  • 在发布消息时,生产者首先清除相关队列(因此​​,如果它正在发布 X 消息,它首先清除 X 队列);这有效地删除了过时的消息,
  • 消费者只是从它想要的队列中消费(X 代表 X 条消息,Y 代表 Y 条消息,等等);从它的角度来看,它只需要执行 basic.get 即可获取下一条相关消息.

这意味着当两个生产者大约同时发送相同类型的消息时会出现竞争条件.结果是 a 队列可能同时有两个(或更多)消息,但由于消息数量上限为生产者数量,并且由于多余的消息在下一次发布时被清除,这应该不是什么大问题.

This implies a race condition when two producers send the same type of message at the about the same time. The result is that its possible for the a queue to have two (or more) messages at the same time, but since the number of messages is upper-bounded by the number of producers, and since the superfluous messages are purged on the next publish, this shouldn't be much of a problem.

总而言之,该解决方案与最佳解决方案相比只有一个额外的步骤,即在发布类型 X 的消息之前清除队列 X.

To summarize, this solution has just one extra step from the optimal solution, namely purge queue X before publishing a message of type X.

如果您在设置此配置时需要任何帮助,寻求建议的最佳地点是 rabbitmq-discuss 邮件列表.

If you need any help setting up this configuration, the perfect place to ask for advice is the rabbitmq-discuss mailing list.

这篇关于如何有选择地从 AMQP (RabbitMQ) 队列中删除消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆