芹菜任务和定制装饰 [英] celery task and customize decorator

查看:120
本文介绍了芹菜任务和定制装饰的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用django和芹菜(django-celery)进行一个项目。我们的团队决定将所有数据访问代码包含在(app-name)/manager.py 中(不会像 django 方式),并且让(app-name)/task.py中的代码只处理与芹菜的组合和执行任务(所以我们在这个层没有django ORM依赖)。



在我的 manager.py 中,我有这样的东西:

  def get_tag(tag_name):
ctype = ContentType.objects.get_for_model(照片)
try:
tag = Tag.objects.get(name = tag_name)
除了ObjectDoesNotExist:
返回Tag.objects.none()
返回标签

def get_tagged_photos(标签):
ctype = ContentType.objects.get_for_model照片)
返回TaggedItem.objects.filter(content_type__pk = ctype.pk,tag__pk = tag.pk)

def get_tagged_photos_count(标签):
返回get_tagged_photos(标签) ()

在我的task.py中,我喜欢将它们包装成任务aybe使用这些任务来做更复杂的任务),所以我写这个装饰器:

  import manager#同一个应用程序中的模块数据访问函数

class mfunc_to_task(object):
def __init __(mfunc_type ='get'):
self.mfunc_type = mfunc_type

def __call__ (self,f):
def wrapper_f(* args,** kwargs):
callback = kwargs.pop('callback',None)

mfunc = getattr ,f .__ name__)

result = mfunc(* args,** kwargs)
如果回调:
如果self.mfunc_type =='get':
子任务(回调).delay(result)
elif self.mfunc_type =='get_or_create':
subtask(callback).delay(result [0])
else:
subtask回调).delay()
返回结果

返回wrapper_f

然后(仍然在 task.py ):

 #@ task 
@mfunc_to_task()
def get_tag():
pass

#@ task
@mfunc_to_task()
def get_tagged_photos():
pass

#@ task
@mfunc_to_task()
def get_tagged_photos_count():
pass

事情工作正常,没有 @task
但是,在应用这个 @task 装饰器(到芹菜文档指示的顶部)之后,事情就开始崩溃了。显然,每次 mfunc_to_task .__调用__ 被调用时,相同的 task.get_tag 函数将以˚F。所以我每次都使用相同的 wrapper_f ,现在我唯一要做的就是得到一个标签。


$ b $我很新装饰师。任何人都可以帮助我了解这里出了什么问题,还是指出其他方式来完成任务?我真的很讨厌为每个数据访问功能编写相同的任务包装代码。

解决方案

不太确定为什么传递参数



如果您使用此示例:

  @ task()
def add(x,y):
return x + y

可以将一些日志记录添加到MyCoolTask​​中:

 从芹菜导入任务
from celery.registry import tasks

import logging
import celery

logger = logging.getLogger(__ name__)

class MyCoolTask​​(celery.Task):

def __call __(self,* args,** kwargs):
在芹菜任务中这个函数调用run方法,这里你可以在
之前设置一些环境变量,任务
logger.info(开始运行)
return self.run(* args,** kwargs)

def after_return(self,status,retval ,task_id,args,kwargs,einfo):
#exit任务任务状态
logger.info(终止运行)
pass

并创建一个扩展类(扩展MyCoolTask​​,但现在有参数):

 类AddTask(MyCoolTask )

def run(self,x,y):
如果x和y:
result = add(x,y)
logger.info('
$ b返回结果
else:
logger.error('参数中没有x或y)

tasks.register(AddTask )

,并确保您将kwargs作为json数据传递:

  {x:8,y:9} 

我得到结果:

  [2013-03-05 17:30:25,853:INFO / MainProcess]开始运行
[2013-03-05 17:30:25,855:INFO / MainProcess] result = 17
[2013-03-05 17:30:26,739:INFO / MainProcess]结束运行
[2013-03-05 17:30:26,741:INFO / MainProcess]任务iamscheduler.tasks.AddTask [6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0]成功在0.888684988022s:17


I'm working on a project using django and celery(django-celery). Our team decided to wrap all data access code within (app-name)/manager.py(NOT wrap into Managers like the django way), and let code in (app-name)/task.py only dealing with assemble and perform tasks with celery(so we don't have django ORM dependency in this layer).

In my manager.py, I have something like this:

def get_tag(tag_name):
    ctype = ContentType.objects.get_for_model(Photo)
    try:
        tag = Tag.objects.get(name=tag_name)
    except ObjectDoesNotExist:
        return Tag.objects.none()
    return tag

def get_tagged_photos(tag):
    ctype = ContentType.objects.get_for_model(Photo)
    return TaggedItem.objects.filter(content_type__pk=ctype.pk, tag__pk=tag.pk)

def get_tagged_photos_count(tag):
    return get_tagged_photos(tag).count()

In my task.py, I like to wrap them into tasks (then maybe use these tasks to do more complicated tasks), so I write this decorator:

import manager #the module within same app containing data access functions

class mfunc_to_task(object):
    def __init__(mfunc_type='get'):
        self.mfunc_type = mfunc_type

    def __call__(self, f):
        def wrapper_f(*args, **kwargs):
            callback = kwargs.pop('callback', None)

            mfunc = getattr(manager, f.__name__)

            result = mfunc(*args, **kwargs)
            if callback:
                if self.mfunc_type == 'get':
                    subtask(callback).delay(result)
                elif self.mfunc_type == 'get_or_create':
                    subtask(callback).delay(result[0])
                else:
                    subtask(callback).delay()
            return result            

        return wrapper_f

then (still in task.py):

#@task
@mfunc_to_task()
def get_tag():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos():
    pass

#@task
@mfunc_to_task()
def get_tagged_photos_count():
    pass

Things work fine without @task. But, after applying that @task decorator(to the top as celery documentation instructed), things just start to fall apart. Apparently, every time the mfunc_to_task.__call__ gets called, the same task.get_tag function gets passed as f. So I ended up with the same wrapper_f every time, and now the only thing I cat do is to get a single tag.

I'm new to decorators. Any one can help me understand what went wrong here, or point out other ways to achieve the task? I really hate to write the same task wrap code for every of my data access functions.

解决方案

Not quite sure why passing arguments won't work?

if you use this example:

@task()
def add(x, y):
    return x + y

lets add some logging to the MyCoolTask:

from celery import task
from celery.registry import tasks

import logging
import celery

logger = logging.getLogger(__name__)

class MyCoolTask(celery.Task):

    def __call__(self, *args, **kwargs):
        """In celery task this function call the run method, here you can
        set some environment variable before the run of the task"""
        logger.info("Starting to run")
        return self.run(*args, **kwargs)

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        #exit point of the task whatever is the state
        logger.info("Ending run")
        pass

and create an extended class (extending MyCoolTask, but now with arguments):

class AddTask(MyCoolTask):

    def run(self,x,y):
        if x and y:
            result=add(x,y)
            logger.info('result = %d' % result)
            return result
        else:
            logger.error('No x or y in arguments')

tasks.register(AddTask)

and make sure you pass the kwargs as json data:

{"x":8,"y":9}

I get the result:

[2013-03-05 17:30:25,853: INFO/MainProcess] Starting to run
[2013-03-05 17:30:25,855: INFO/MainProcess] result = 17
[2013-03-05 17:30:26,739: INFO/MainProcess] Ending run
[2013-03-05 17:30:26,741: INFO/MainProcess] Task iamscheduler.tasks.AddTask[6a62641d-16a6-44b6-a1cf-7d4bdc8ea9e0] succeeded in 0.888684988022s: 17

这篇关于芹菜任务和定制装饰的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆