RabbitMQ插件删除重复的消息 [英] RabbitMQ plugin to remove duplicate messages

查看:402
本文介绍了RabbitMQ插件删除重复的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个RabbitMQ队列用于文档生成.基本上,每个文档都有typestate(新的,正在处理的,就绪的),因此我将主题交换与type.state之类的路由键一起使用.每次文档更改时,我都会将带有最后文档描述的消息发送给交易所,并且效果很好.

I have a RabbitMQ queues for documents generation. Basically, each document has type and state (new, processing, ready), so I use topic exchange with routing keys like type.state. Every time document changes I send the message with last document description to the exchange and it works good enough.

但是有时文档可以处理两次:

However sometimes document can be processed twice:

  1. 用户发送新文档.因此,新消息report.new发送到交换.
  2. 当worker尚未开始文档处理(队列尚未到达)时,用户更新了文档.发送了同一文档的新消息report.new.
  3. 因此,现在工人收到了第一条消息并开始工作,而文档已更改,因此这项工作完全没有意义.
  1. User send new document. So new message report.new is sent to exchange.
  2. While worker hasn't started document processing (the queue hasn't yet reached) user updated the document. The new message report.new for the same document is sent.
  3. So now worker get the first message and start his work, while the document was changed and so this work is totally senseless.

现在,我只是将一些小代码添加到工作程序中,将消息中的last_modified文档密钥与数据库中的文档密钥进行比较,如果消息不相同,则对消息进行确认.但是我认为这不是最好的解决方案.

For now I'm just add small code into workers, comparing last_modified document key from the message with the one from the database and ack the message if they are not the same. However I don't think this is the best solution.

我的想法是将ID添加到邮件标题中,并具有一些RabbitMQ插件,该插件将从队列中删除具有相同ID的较旧邮件.

My idea is to add ID to message headers and have some RabbitMQ plugin which will remove older messages with the same ID from the queue.

谢谢.

P.S.也许另一个MQ引擎在这里有用吗?例如.也许ActiveMQ具有这样的功能?

P.S. Maybe another MQ engine can be useful here? E.g. maybe ActiveMQ has such a feature?

推荐答案

好,我已经阅读了RabbitMQ内部架构,并发现这是不可能的.所以有人寻找它的方法.

Ok, i've read about RabbitMQ inner architecture and find out it's impossible. So the way around for somebody looking for it.

  1. 仅在邮件正文中发送文档ID
  2. 为工作者创建键值存储(为此,我使用memcached).关键是ID值是为此ID运行的上一个工作程序的时间戳.
  3. 当worker收到消息时,它将检查消息时间戳是否大于键值存储中的消息时间戳.如果是这样,则更新存储中的时间戳并运行任务,否则就跳过它.
  1. Send only document ID in message body
  2. Create a key-value store for worker (i use memcached for this). Key is ID value is timestamp of last worker run for this ID.
  3. When worker receives the message it checks if message timestamp greater then the one from key-value store. If it is, then update timestamp in the store and run the task, otherwise just skip it.

这篇关于RabbitMQ插件删除重复的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆