如何在任何MQ平台上实现此单个并发分布式队列? [英] How can I implement this single concurrency distributed queue in any MQ platform?

查看:84
本文介绍了如何在任何MQ平台上实现此单个并发分布式队列?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在努力寻找实现特定队列的解决方案,该解决方案具有以下特征:

I am currently struggle to find a solution for implement a specific kind of queue, which require the following traits:

  1. 所有队列必须遵守添加作业的顺序.
  2. 整个队列的并发性为1,这意味着每个队列一次只能执行一个作业,而不是工人.
  3. 将有数千个这样的队列.
  4. 它需要分发并能够扩展(例如,如果我添加一个工作程序)
  1. All queue must respect the order that job were added.
  2. The whole queue will have a concurrency of 1, which means that there will only be one job execute at a time per queue, not worker.
  3. There will be more than a few thousand queue like this.
  4. It need to be distributed and be able to scale (example if I add a worker)

基本上它是一个单进程FIFO队列,这正是我尝试不同的消息队列软件(例如ActiveMQ或RabbitMQ)时想要的,但是一旦我将其扩展到2个worker,它就无法正常工作,因为在这种情况下我希望它可以扩展并保持与单个进程队列完全相同的功能.下面,我附上它在具有多个工作程序的分布式环境中应如何工作的描述.

Basically it is a single process FIFO queue, and this is exactly what I want when tryout different message queue software like ActiveMQ or RabbitMQ, but as soon as I scale it to 2 worker, it just does not work since in this case I want it to scale and maintain exact same feature of single process queue. Below I attach the description of how it should work in a distributed environment with multiple worker.

拓扑外观示例:(请注意,队列工人之间存在多对多关系)

Example of how the topology looks like: (Note that it's a many to many relationship between the Queue and Workers)

其运行方式示例:

+------+-----------------+-----------------+-----------------+
| Step | Worker 1        | Worker 2        | Worker 3        |
+------+-----------------+-----------------+-----------------+
| 1    | Fetch Q/1/Job/1 | Fetch Q/2/Job/1 | Waiting         |
+------+-----------------+-----------------+-----------------+
| 2    | Running         | Running         | Waiting         |
+------+-----------------+-----------------+-----------------+
| 3    | Running         | Done Q/2/Job/1  | Fetch Q/2/Job/2 |
+------+-----------------+-----------------+-----------------+
| 4    | Done Q/1/Job/1  | Fetch Q/1/Job/2 | Running         |
+------+-----------------+-----------------+-----------------+
| 5    | Waiting         | Running         | Running         |
+------+-----------------+-----------------+-----------------+

这可能不是最好的表示,但是它表明,即使在队列1 队列2 中,也有更多的作业,但是工人3确实有在上一个作业完成之前,不要开始获取下一个作业.

Probably this is not the best representation but it show that, even in the Queue 1 and Queue 2, there are more jobs, but Worker 3 does not start fetching the next job until the previous one finish.

这就是我努力寻找好的解决方案的原因.

This is what I struggle to find a good solution.

我已经尝试了许多其他解决方案,例如RabbitMQ,activeMQ,apollo ...这些使我可以创建数千个队列,但是当我尝试使用时,所有这些都将使用worker 3在队列中运行下一个作业.并发是每个工作人员

I have tried a lot of other solution like rabbitMQ, activeMQ, apollo... These allow me to create thousand of queues, but all of them as i try out, will use worker 3 to run the next job in queue. And the concurrency is per worker

有没有可以在任何MQ平台上实现这一目标的解决方案,例如ActiveMQ,RabbitMQ,ZeroMQ等??

Are there any solution out there that can make this possible in any MQ platform, example ActiveMQ, RabbitMQ, ZeroMQ etc..?

谢谢:)

推荐答案

您可以使用Redis列表以及额外的派遣"队列来实现此目的,所有工作人员BRPOP都可以使用此队列.分派队列中的每个作业都用原始队列ID进行标记,当工作人员完成该作业时,它将转到该原始队列,并对分派队列执行RPOPLPUSH,以使下一个作业可用于任何其他工作人员.因此,调度队列将最多具有 num_queues 个元素.

You can achieve this using Redis lists with an additional "dispatch" queue that all workers BRPOP on for their jobs. Each job in the dispatch queue is tagged with the original queue ID, and when the worker has completed the job it goes to this original queue and performs RPOPLPUSH onto the dispatch queue to make the next job available for any other worker. The dispatch queue will therefore have a maximum of num_queues elements.

您需要处理的一件事是当源队列为空时调度队列的初始填充.这可能只是发布者针对初始设置的每个队列的空"标志进行的检查,当原始队列中没有要分发的东西时,也由工作人员进行设置.如果设置了此标志,则发布者可以将第一个作业直接直接LPUSH到调度队列中.

One thing you'll have to handle is the initial population of the dispatch queue when the source queue is empty. This could just be a check done by the publisher against an "empty" flag for each queue that is set initially, and also set by the worker when there is nothing left in the original queue to dispatch. If this flag is set, the publisher can just LPUSH the first job directly onto the dispatch queue.

这篇关于如何在任何MQ平台上实现此单个并发分布式队列?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆