使用RabbitMq锁定和批量获取消息 [英] Locks and batch fetch messages with RabbitMq

查看:1055
本文介绍了使用RabbitMq锁定和批量获取消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试以一种更加非常规的方式使用RabbitMq(尽管目前我可以根据需要选择其他任何消息队列实现).消费者没有将Rabbit推送消息留给我的使用者,而是连接到队列并获取一批N条消息(在此期间,它消耗了一些消息,并且可能拒绝了一些消息),然后跳转到另一个队列,依此类推.这样做是为了实现冗余.如果某些使用者崩溃,则保证所有邮件都会被其他使用者使用.

I'm trying to use RabbitMq in a more unconventional way (though at this point i can pick any other message queue implementation if needed). Instead of leaving Rabbit push messages to my consumers, the consumer connects to a queue and fetches a batch of N messages (during which it consumes some and possible rejects some), after which it jumps to another queue and so on. This is done for redundancy. If some consumers crash all messages are guaranteed to be consumed by some other consumer.

问题是我有多个消费者,并且我不希望他们在同一队列中竞争.有没有办法保证队列上的锁?如果不是,我是否至少可以确保如果2个使用者连接到相同的队列,他们不会读取相同的消息?交易可能在某种程度上对我有帮助,但是我听说有消息称它们将从RabbitMQ中删除.

The problem is that I have multiple consumers and I don't want them to compete over the same queue. Is there a way to guarantee a lock on a queue? If not, can I at least make sure that if 2 consumers are connected to the same queue they don't read the same message? Transactions might help me to some degree but I've heard talk that they'll get removed from RabbitMQ.

也欢迎其他体系结构建议.

Other architectural suggestions are welcomed too.

谢谢!

正如评论中指出的那样,我需要如何处理消息有一个特殊性.它们仅在分组中才有意义,并且相关消息很有可能在队列中聚集在一起.例如,如果我拉出一批100条消息,则很有可能对消息1-3、4-5、6-10等执行某项操作.如果我找不到某些消息的组,则我将它们重新提交到队列中. WorkQueue不起作用,因为它将消息从同一组传播到多个不知道如何处理它们的工作人员.

As pointed in the comment there's an a particularity in how I need to process the messages. They only make sense taken in groups and there's a high probability that related messages are clumped together in a queue. If for example I pull a batch of 100 messages, there's a high probability that I'll be able to do something with messages 1-3, 4-5,6-10 etc. If I fail to find a group for some messages I'll resubmit them to the queue. WorkQueue wouldn't work because it would spread messages from the same group to multiple workers that wouldn't know what to do with them.

推荐答案

您是否已在企业集成上浏览了这本免费的在线图书模式?

听起来您确实需要一个工作流程,在您将消息发送给您的工作人员之前,您拥有一个批处理程序组件.使用RabbitMQ,有两种方法可以做到这一点.使用可以为您完成批处理的交换类型(和消息格式),或者使用一个队列,并使用工作人员对批处理进行排序并将每个批处理置于其自己的队列中.批处理程序可能还应该向控制队列发送批处理就绪"消息,以便工作人员可以发现新的批处理队列的存在.处理完批次后,工作人员可以删除批次队列.

It sounds like you really need a workflow where you have a batcher component before the messages get to your workers. With RabbitMQ there are two ways to do that. Either use an exchange type (and message format) that can do the batching for you, or have one queue, and a worker that sorts out batches and places each batch on its own queue. The batcher should probably also send a "batch ready" message to a control queue so that a worker can discover the existence of the new batch queue. Once the batch is processed the worker could delete the batch queue.

如果您可以控制消息格式,则可以使RabbitMQ以两种方式隐式地进行批处理.通过主题交换,您可以确保每条消息上的路由密钥的格式均为work.batchid.something,然后得知批次xxyzz存在的工作人员将使用#.xxyzz.#之类的绑定密钥来仅消耗这些消息.无需重新发布.

If you have control over the message format, you might be able to get RabbitMQ to do the batching implicitly in a couple of ways. With a topic exchange, you could make sure that the routing key on each message is of the format work.batchid.something and then a worker that learns of the existence of batch xxyzz would use a binding key like #.xxyzz.# to only consume those messages. No republishing needed.

另一种方法是在标头中包含批次ID,并使用较新的标头交换类型.当然,如果您愿意编写少量的Erlang代码,也可以实现自己的自定义交换类型.

The other way is to include a batch id in a header and use the newer headers exchange type. Of course you can also implement your own custom exchange types if you are willing to write a small amount of Erlang code.

尽管如此,我还是建议您阅读本书,因为与大多数人开始时使用的典型工作队列概念相比,它提供了更好的消息传递体系结构概述.

I do recommend checking the book though, because it gives a better overview of messaging architecture than the typical worker queue concept that most people start with.

这篇关于使用RabbitMq锁定和批量获取消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆