根据Celery任务状态更新Django模型字段 [英] Update Django Model Field Based On Celery Task Status

查看:59
本文介绍了根据Celery任务状态更新Django模型字段的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的模型中,我有一个 status 字段,默认值为'Processing'.在Django管理界面中,用户单击保存"按钮后,表单输入将传递到仅休眠30秒的芹菜任务.

In my model, I have a status field with a default value of 'Processing'. In the Django admin interface, after user clicks 'Save' button, the form inputs are passed to a celery task that just sleeps for 30 seconds.

那30秒之后,我该怎么做:

After that 30 seconds, how do I:

  1. 确定芹菜任务是否成功?
  2. 将模型的 status 字段从处理中"更新为实际状态(例如:完成,失败?
  1. determine if the celery task was successful?
  2. update the model's status field from 'Processing' to the actual status (ex: Completed, Failed?

models.py

models.py

from django.db import models

class Scorecard(models.Model):

    name = models.CharField(max_length=100, unique=True)
    status = models.CharField(max_length=20, default='Processing')

    def __str__(self):
        return self.name

admin.py

from django.contrib import admin
from scorecards.models import Scorecard
from scorecards.tasks import get_report

class ScorecardAdmin(admin.ModelAdmin):
    list_display = ['name', 'status']

    def save_model(self, request, obj, form, change):
        if form.is_valid():
            data = form.cleaned_data
            name = data['name']
            get_report.delay(name)
        super().save_model(request, obj, form, change)

admin.site.register(Scorecard, ScorecardAdmin)

tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
from time import sleep

@shared_task
def get_report(name):
    sleep(30)

芹菜任务的实时状态每隔x个时间间隔更新一次 status 字段会很好,但是到目前为止,我真的很好奇该怎么做.

Real-time status of the celery task updating the status field every x time intervals would be nice, but for now I'm just really curious how to do this at all.

推荐答案

我还没有弄清实时状态(但是),但是一旦任务完成,我确实设法更改了状态.

I didn't figure out real-time status (yet) but did manage to change the status once a task is completed.

这些是下面的主要部分.理解为什么这样做的关键原因是我要像这样在-pool = solo 中启动一个芹菜工人:

These were the main parts below. The critical reason to understand why this works is that I'm starting a celery worker in --pool=solo like so:

celery -A scorecard worker --pool=solo -l info

这是一个单线程执行池(对于我当前的用途而言很好),但这意味着它将处理第一个任务 get_report(name),并在完成后处理 set_task_status(id),在其中检查结果的状态并将状态字段设置为实际状态.

This is a single-threaded execution pool (which is fine for my current purposes) but it means it will process the first task get_report(name), and when that's done, process the set_task_status(id) where it checks the status of the result and sets the status field to be whatever the actual status is.

models.py

models.py

class Scorecard(models.Model):
    ....
    task_id = models.CharField(max_length=50)

admin.py

class ScorecardAdmin(admin.ModelAdmin):
    ...
            result = get_report.delay(name)
            set_task_status.delay(result.task_id)
    ...

tasks.py

@shared_task
def get_report(name):
    sleep(30)

@shared_task
def set_task_status(id):
    instance = Scorecard.objects.get(task_id=id)
    task_status = AsyncResult(id)
    instance.status = task_status.status
    instance.save()

这是我到目前为止所发现的.

This is what I've figured out so far.

这篇关于根据Celery任务状态更新Django模型字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆