处理芹菜中的“ post_save”信号 [英] handle `post_save` signal in celery

查看:114
本文介绍了处理芹菜中的“ post_save”信号的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个运行时间较长的任务,需要在插入或更新特定模型后执行。

I have a rather long running task that needs to be executed after inserting or updating an specific model.

我决定使用 post_save 信号,而不是覆盖 save 方法以减少耦合。由于Django信号不是异步信号,因此我不得不将长时间运行的任务作为Celery任务(我们已经在堆栈中完成了)。

I decided to use post_save signal instead of overriding save method to reduce coupling. Since Django signals are not asynchronous I had to do the long running job as a Celery task (which we already have in our stack).

我的信号处理的简化版本函数如下:

A simplified version of my signal handling function is as follows:

@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
    handle_save_task.apply_async(args=(instance.pk,))

另外,因为作业是异步完成的,所以我传递了对象的主键而不是实例本身。

Also, because the job is done asynchronously I passed the primary key of the object instead of the instance itself.

@app.task(queue='elastic')
def handle_save_task(instance_pk):
    try:
        instance = MyModel.objects.get(pk=instance_pk)
    except ObjectDoesNotExist:
        # Abort
        logger.warning("Saved object was deleted before this task get a chance to be executed [id = %d]" % instance_pk)
    else:
        # Do my things with instance

实际收益令人困惑的是,执行celery任务时,它无法访问新保存的实例。就像它是在保存之前执行的一样! (不是被称为 post _ save的信号吗?具有讽刺意味)

The actual problem is that when the celery task is executed it can't access the newly saved instance. Just like if it was executed prior to saving! (wasn't the signal called post_save? What an irony)

在保存之前执行是指它是否是新实例将其插入数据库,在celery任务中,我得到一个 DoesNotExist 异常,并且在实例已经存在于数据库中并且调用了save方法以更新其某些属性的情况下,在celery任务中获取具有旧属性值的旧实例。

By "executed before saving" I mean if it is a new instance that's being inserted to DB, in the celery task I get an DoesNotExist exception and in cases where instance was already in DB and the save method was called to update some of its properties I get the old instance with old property values in the celery task.

一种解决方法是在延迟几秒钟的情况下运行celery任务,但这显然不是一个好的解决方案并且也不能保证在高负载或长时间网络延迟下正确执行行为。

A workaround is to run celery tasks with a few seconds of delay, but obviously it is not a good solution and also can not guarantee the proper execution behavior under heavy loads or long network delays.

我是完全错误的,还是做了一些细微的修改就能使它正常工作?

Am I doing it completely wrong or with some slight modifications I can make it work?

推荐答案

这可能是由于在事务内执行更新引起的。事务已在Celery Task已经启动之后提交,这使Celery Task在运行时看到您的旧值。

This is probably caused by your update being performed inside a transaction. The transaction is committed after the Celery Task has already started which causes the Celery Task to see your old value when running.

您可以尝试以下更改:

from django.db import transaction

@receiver(post_save, sender=MyModel)
def my_model_post_save(sender, instance, **kwargs):
    transaction.on_commit(lambda: handle_save_task.apply_async(args=(instance.pk,)))

这篇关于处理芹菜中的“ post_save”信号的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆