芹菜的回调apply_async [英] Callback for celery apply_async

查看:54
本文介绍了芹菜的回调apply_async的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在应用程序中使用 celery 来运行定期任务。让我们看下面的简单示例

I use celery in my application to run periodic tasks. Let's see simple example below

from myqueue import Queue
@perodic_task(run_every=timedelta(minutes=1))
def process_queue():
    queue = Queue()
    uid, questions = queue.pop()
    if uid is None:
        return

    job = group(do_stuff(q) for q in questions)
    job.apply_async()

def do_stuff(question):
    try:
        ...
    except:
        ...
        raise

当您可以在上面的示例中看到,我使用 celery 运行异步任务,但是(因为它是队列),我需要执行 queue.fail(uid ),否则 do_stuff queue.ack(uid)例外。在这种情况下,在两种情况下从我的任务中进行一些回调是非常清楚和有用的- on_failure on_success

As you can see in the example above, i use celery to run async task, but (since it's a queue) i need to do queue.fail(uid) in case of exception in do_stuff or queue.ack(uid) otherwise. In this situation it would be very clear and usefull to have some callback from my task in both cases - on_failure and on_success.

我看到了一些文档,但从未见过在 apply_async 中使用回调的实践。

I saw some documentation, but never seen practices of using callbacks with apply_async. Is it possible to do that?

推荐答案

子类化Task类并重载on_success和on_failure函数:

Subclass the Task class and overload the on_success and on_failure functions:

class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        pass

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass


@celery.task(base=CallbackTask)  # this does the trick
def add(x, y):
    return x + y

这篇关于芹菜的回调apply_async的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆