芹菜在执行前使用django数据库撤销任务 [英] Celery revoke task before execute using django database

查看:226
本文介绍了芹菜在执行前使用django数据库撤销任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Django数据库而不是RabbitMQ来实现并发的原因。



但是,在执行任务之前,我无法解决问题。 >

我发现了一些有关这个问题的答案,但是似乎没有完成,或者我找不到足够的帮助。





如何使用模型扩展芹菜任务表,在不希望任务执行时添加一个布尔字段(撤销)来设置?



谢谢。

解决方案

由于Celery通过ID跟踪任务,所有您真正需要的是能够知道哪些ID已被取消。而不是修改 kombu 内部构件,您可以创建自己的表(或 memcached 等),只是跟踪取消的ID,然后检查当前可取消任务的ID是否在其中。



这是支持远程撤销的传输命令在内部执行:


所有工作节点都保留内存中的已撤销任务ID,内存中的
或磁盘上的持久性(见Persistent revokes)。 (来自Celery文档)


当您使用django传输时,您有责任自己做。在这种情况下,每个任务都需要检查是否已被取消。



所以你的任务的基本形式(添加到实际操作中的日志记录)成为:

  from celery import shared_task 
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__ name__)

@shared_task
def my_task():
如果task_canceled(my_task.request.id )
raise忽略
logger.info(做我的东西)

你可以扩展&通过各种方式改进这一点,例如通过创建一个基本的CancelableTask类,就像您链接到的其他答案之一,但这是基本形式。你现在缺少的是模型和检查它的功能。



请注意,这种情况下的ID将是一个字符串ID,如 a5644f08-7d30-43ff-a61e-81c165ad9e19 不是整数。您的模型可以这样简单:

  from django.db import models 

class CancellledTask(
task_id = models.CharField(max_length = 200)

def cancel_task(request_id):
CancellledTask.objects.create(task_id = request_id)

def task_canceled(request_id):
返回CancellledTask.objects.filter(task_id = request_id).exists()

现在,您可以通过观察芹菜服务的调试日志来检查行为,方法如下:

  my_task.delay()
models.cancel_task(my_task.delay())


I'm using Django database instead of RabbitMQ for concurrency reasons.

But I can't solve the problem of revoking a task before it execute.

I found some answers about this matter but they don't seem complete or I can't get enough help.

How can I extend celery task table using a model, add a boolean field (revoked) to set when I don't want the task to execute?

Thanks.

解决方案

Since Celery tracks tasks by an ID, all you really need is to be able to tell which IDs have been canceled. Rather than modifying kombu internals, you can create your own table (or memcached etc) that just tracks canceled IDs, then check whether the ID for the current cancelable task is in it.

This is what the transports that support a remote revoke command do internally:

All worker nodes keeps a memory of revoked task ids, either in-memory or persistent on disk (see Persistent revokes). (from Celery docs)

When you use the django transport, you are responsible for doing this yourself. In this case it's up to each task to check whether it has been canceled.

So the basic form of your task (logging added in place of an actual operation) becomes:

from celery import shared_task
from celery.exceptions import Ignore
from celery.utils.log import get_task_logger
from .models import task_canceled
logger = get_task_logger(__name__)

@shared_task
def my_task():
    if task_canceled(my_task.request.id):
        raise Ignore
    logger.info("Doing my stuff")

You can extend & improve this in various ways, such as by creating a base CancelableTask class as in one of the other answers you linked to, but this is the basic form. What you're missing now is the model and the function to check it.

Note that the ID in this case will be a string ID like a5644f08-7d30-43ff-a61e-81c165ad9e19, not an integer. Your model can be as simple as this:

from django.db import models

class CanceledTask(models.Model):
    task_id = models.CharField(max_length=200)

def cancel_task(request_id):
    CanceledTask.objects.create(task_id=request_id)

def task_canceled(request_id):
    return CanceledTask.objects.filter(task_id=request_id).exists()

You can now check the behavior by watching your celery service's debug logs while doing things like:

my_task.delay()
models.cancel_task(my_task.delay())

这篇关于芹菜在执行前使用django数据库撤销任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆