用于删除重复消息的 RabbitMQ 插件 [英] RabbitMQ plugin to remove duplicate messages

查看:17
本文介绍了用于删除重复消息的 RabbitMQ 插件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用于文档生成的 RabbitMQ 队列.基本上,每个文档都有 typestate(新的、正在处理的、准备好的),所以我使用带有路由键的主题交换,例如 type.state.每次文档更改时,我都会将带有最后文档描述的消息发送到交易所,并且效果很好.

I have a RabbitMQ queues for documents generation. Basically, each document has type and state (new, processing, ready), so I use topic exchange with routing keys like type.state. Every time document changes I send the message with last document description to the exchange and it works good enough.

但有时文档可以被处理两次:

However sometimes document can be processed twice:

  1. 用户发送新文件.所以新消息 report.new 被发送到 exchange.
  2. 当工作人员尚未开始处理文档(队列尚未到达)时,用户更新了文档.发送同一文档的新消息 report.new.
  3. 所以现在工人收到第一条消息并开始他的工作,而文档被更改,所以这项工作完全没有意义.
  1. User send new document. So new message report.new is sent to exchange.
  2. While worker hasn't started document processing (the queue hasn't yet reached) user updated the document. The new message report.new for the same document is sent.
  3. So now worker get the first message and start his work, while the document was changed and so this work is totally senseless.

现在我只是将小代码添加到工作人员中,将消息中的 last_modified 文档键与数据库中的文档键进行比较,如果它们不同,则确认消息.但是我认为这不是最好的解决方案.

For now I'm just add small code into workers, comparing last_modified document key from the message with the one from the database and ack the message if they are not the same. However I don't think this is the best solution.

我的想法是在消息头中添加 ID 并有一些 RabbitMQ 插件,该插件将从队列中删除具有相同 ID 的旧消息.

My idea is to add ID to message headers and have some RabbitMQ plugin which will remove older messages with the same ID from the queue.

谢谢.

附:也许另一个 MQ 引擎在这里有用?例如.也许ActiveMQ有这样的功能?

P.S. Maybe another MQ engine can be useful here? E.g. maybe ActiveMQ has such a feature?

推荐答案

好的,我读过 RabbitMQ 内部架构,发现这是不可能的.所以寻找它的人的方式.

Ok, i've read about RabbitMQ inner architecture and find out it's impossible. So the way around for somebody looking for it.

  1. 仅在消息正文中发送文档 ID
  2. 为 worker 创建一个键值对存储(我为此使用了 memcached).键是 ID 值是此 ID 的最后一次工作人员运行的时间戳.
  3. 当工作人员收到消息时,它会检查消息时间戳是否大于键值存储中的时间戳.如果是,则更新 store 中的时间戳并运行任务,否则跳过它.
  1. Send only document ID in message body
  2. Create a key-value store for worker (i use memcached for this). Key is ID value is timestamp of last worker run for this ID.
  3. When worker receives the message it checks if message timestamp greater then the one from key-value store. If it is, then update timestamp in the store and run the task, otherwise just skip it.

这篇关于用于删除重复消息的 RabbitMQ 插件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆