如何有选择地从AMQP(RabbitMQ)队列中删除消息? [英] How to selectively delete messages from an AMQP (RabbitMQ) queue?

查看:1704
本文介绍了如何有选择地从AMQP(RabbitMQ)队列中删除消息?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想选择性地从AMQP队列中删除消息,甚至不读取它们.

I'd like to selectively delete messages from an AMQP queue without even reading them.

情况如下:

基于X类型的新信息到达的事实,发送端希望使X类型的消息过期.因为订阅者很可能还没有使用X类型的最新消息,所以发布者应该删除以前的X类型消息并将最新消息放入队列中.整个操作对订户应该是透明的-实际上,他应该使用STOMP之类的简单方法来获取消息.

Sending side wants to expire messages of type X based on a fact that new information of type X arrived. Because it's very probable that the subscriber didn't consume latest message of type X yet, publisher should just delete previous X-type messages and put a newest one into the queue. The whole operation should be transparent to the subscriber - in fact he should use something as simple as STOMP to get the messages.

如何使用AMQP做到这一点?还是在另一个消息传递协议中更方便?

How to do it using AMQP? Or maybe it's more convenient in another messaging protocol?

我想避免复杂的基础架构.所需的全部消息传递如上所述那样简单:一个队列,一个订阅者,一个发布者,但是发布者必须具有根据给定条件临时删除消息的能力.

I'd like to avoid a complicated infrastructure. The whole messaging needed is as simple as above: one queue, one subscriber, one publisher, but the publisher must have an ability to ad-hoc deleting the messages for a given criteria.

发布者客户端将使用Ruby,但实际上,一旦发现如何在协议中使用它,我就会处理任何语言.

The publisher client will use Ruby but actually I'd deal with any language as soon as I discover how to do it in the protocol.

推荐答案

您当前无法在RabbitMQ(或更广泛地说,在AMQP中)自动执行此操作.但是,这是一个简单的解决方法.

You cannot currently do this in RabbitMQ (or more generally, in AMQP) automatically. But, here's an easy workaround.

假设您要发送三种类型的消息:Xs,Ys和Zs.如果我正确理解了您的问题,那么当X消息到达时,您希望经纪人忘记所有其他尚未传递的X消息.

Let's say you want to send three types of messages: Xs, Ys and Zs. If I understand your question correctly, when an X message arrives, you want the broker to forget all other X messages that haven't been delivered.

这在RabbitMQ中非常容易做到:

This is fairly easy to do in RabbitMQ:

  • 生产者声明三个队列:X,Y和Z(它们自动绑定到默认交换,它们的名称作为路由键,这正是我们想要的),
  • 发布消息时,生产者首先清除相关队列(因此​​,如果发布的是X消息,则首先清除X队列);这样可以有效地删除过时的邮件,
  • 使用者仅从所需队列中进行消费(X表示X消息,Y表示Y消息,等等);从它的角度来看,它只需要执行basic.get即可获取下一条相关消息.

这意味着当两个生产者在大约同一时间发送相同类型的消息时出现竞争状态.结果是,队列可能同时具有两个(或更多)消息,但是由于消息数是生产者数的上限,并且由于多余的消息会在下一次发布时被清除,这应该不是什么大问题.

This implies a race condition when two producers send the same type of message at the about the same time. The result is that its possible for the a queue to have two (or more) messages at the same time, but since the number of messages is upper-bounded by the number of producers, and since the superfluous messages are purged on the next publish, this shouldn't be much of a problem.

总而言之,此解决方案与最佳解决方案仅相距仅一步之遥,即在发布类型为X的消息之前清除队列X.

To summarize, this solution has just one extra step from the optimal solution, namely purge queue X before publishing a message of type X.

如果需要任何帮助来设置此配置,那么寻求建议的理想场所是Rabbitmq-discuss邮件列表.

If you need any help setting up this configuration, the perfect place to ask for advice is the rabbitmq-discuss mailing list.

这篇关于如何有选择地从AMQP(RabbitMQ)队列中删除消息?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆