如何根据标准限制并发消息消耗 [英] How to limit concurrent message consuming based on a criteria

查看:0
本文介绍了如何根据标准限制并发消息消耗的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

场景(我已经简化了):

  • 许多最终用户可以从前端Web应用程序(生产者)开始作业(繁重的作业,例如渲染大型PDF)。
  • 作业被发送到单个持久RabbitMQ队列。
  • 许多工作应用程序(使用者)处理这些作业并将结果写回数据存储中。

此相当标准的模式运行良好。

问题:如果用户在同一分钟内启动了10个作业,而在同一时间只有10个工作应用程序处于运行状态,则该最终用户实际上会占用自己的所有计算时间。

问题:如何确保在任何时候只处理每个最终用户的一个作业?(奖金:不能限制某些最终用户(例如管理员))

此外,我不希望前端应用程序阻止终端用户启动并发作业。我只希望最终用户等待他们的并发作业一次完成一个。

解决方案?:我是否应该为每个最终用户动态创建一个自动删除独占队列?如果是,我如何告诉工作应用程序开始使用此队列?如何确保一个(且只有一个)工作进程从此队列中消费?

推荐答案

这样的功能不是RabbitMQ原生提供的。 但是,您可以通过以下方式实现它。不过,您必须使用轮询,这不是很高效(与订阅/发布相比)。您还必须利用ZooKeeper在不同工作人员之间进行协调。

您将创建2个队列:1个高优先级队列(用于管理作业)和1个低优先级队列(用于普通用户作业)。这10个工作进程将从两个队列中检索消息。每个Worker将执行无限循环(理想情况下,当队列为空时具有休眠间隔),其中它将尝试互换地从每个队列检索消息:

  • 对于高优先级队列,工作进程只需检索消息、处理该消息并向队列确认即可。
  • 对于低优先级队列,Worker尝试在ZooKeeper中持有锁(通过写入特定的文件-Znode),如果成功,则读取一条消息,对其进行处理并确认。如果ZooKeeper写入不成功,则其他人持有锁,因此此Worker跳过此步骤并重复循环。

这篇关于如何根据标准限制并发消息消耗的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆