如何使用Celery,RabbitMQ和Django确保每个用户的任务执行顺序? [英] How to ensure task execution order per user using Celery, RabbitMQ and Django?

查看:215
本文介绍了如何使用Celery,RabbitMQ和Django确保每个用户的任务执行顺序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行Django,Celery和RabbitMQ。我要实现的目标是确保与一个用户相关的任务按顺序执行(特别是一次,我不希望每个用户执行任务并发)

I'm running Django, Celery and RabbitMQ. What I'm trying to achieve is to ensure, that tasks related to one user are executed in order (specifically, one at the time, I don't want task concurrency per user)


  • 为用户添加新任务时,应依赖于最近添加的任务。其他功能可能包括:如果此用户的此类任务已排队并且尚未启动,则不将任务添加到队列中。

I我做了一些研究,然后:

I've done some research and:


  • 我找不到将新创建的任务与芹菜本身已经排队的任务链接的方法似乎只能链接新任务。

  • 我认为这两种功能都可以通过自定义RabbitMQ消息处理程序实现,尽管毕竟可能很难编写代码。

  • 我也了解了celery-tasktree,这可能是确保执行顺序的最简单方法,但是如何将新任务与已经 applied_async task_tree或队列?我有什么办法可以使用此程序包实现该额外的无重复功能吗?

  • I couldn't find a way to link newly created task with already queued one in Celery itself, chains seem to be only able to link new tasks.
  • I think that both functionalities are possible to implement with custom RabbitMQ message handler, though it might be hard to code after all.
  • I've also read about celery-tasktree and this might be an easiest way to ensure execution order, but how do I link new task with already "applied_async" task_tree or queue? Is there any way that I could implement that additional no-duplicate functionality using this package?

编辑:这里也有这个锁 芹菜食谱,由于这个概念很好,我在我的情况下,无法找到使它按预期工作的可能方法-简单地,即使我无法为用户获取锁,也必须重试任务,但这意味着将其推到队列的末尾。

There is this also this "lock" example in celery cookbook and as the concept is fine, I can't see a possible way to make it work as intended in my case - simply if I can't acquire lock for user, task would have to be retried, but this means pushing it to the end of queue.

这里最好的做法是什么?

What would be the best course of action here?

推荐答案

如果您配置了芹菜工作者,这样他们一次只能执行一项任务(请参阅 worker_concurrency 设置),则可以基于每个用户实施所需的并发性。使用类似

If you configure the celery workers so that they can only execute one task at a time (see worker_concurrency setting), then you could enforce the concurrency that you need on a per user basis. Using a method like

 NUMBER_OF_CELERY_WORKERS = 10

def get_task_queue_for_user(user):
    return "user_queue_{}".format(user.id % NUMBER_OF_CELERY_WORKERS)

获取任务队列基于用户ID,每个任务将分配给每个用户相同的队列。需要将工作进程配置为仅使用单个任务队列中的任务。

to get the task queue based on the user id, every task will be assigned to the same queue for each user. The workers would need to be configured to only consume tasks from a single task queue.

它会像这样播放:


  1. 用户49个触发器任务

  1. User 49 triggers a task

任务已发送到 user_queue_9

当一个正在监听 user_queue_9 的芹菜工人准备使用新任务时,该任务就会执行

When the one and only celery worker that is listening to user_queue_9 is ready to consume a new task, the task is executed

虽然这是一个骇人听闻的答案,因为

This is a hacky answer though, because


  • 每个队列仅需要一个芹菜工人是一个脆弱的系统-如果芹菜工人停止,整个队列就会停止

  • requiring just single celery worker for each queue is a brittle system -- if the celery worker stops, the whole queue stops

工人正在运行效率低下

这篇关于如何使用Celery,RabbitMQ和Django确保每个用户的任务执行顺序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆