根据Celery任务状态更新Django模型字段 [英] Update Django Model Field Based On Celery Task Status
问题描述
在我的模型中,我有一个 status
字段,默认值为'Processing'.在Django管理界面中,用户单击保存"按钮后,表单输入将传递到仅休眠30秒的芹菜任务.
In my model, I have a status
field with a default value of 'Processing'. In the Django admin interface, after user clicks 'Save' button, the form inputs are passed to a celery task that just sleeps for 30 seconds.
那30秒之后,我该怎么做:
After that 30 seconds, how do I:
- 确定芹菜任务是否成功?
- 将模型的
status
字段从处理中"更新为实际状态(例如:完成,失败?
- determine if the celery task was successful?
- update the model's
status
field from 'Processing' to the actual status (ex: Completed, Failed?
models.py
models.py
from django.db import models
class Scorecard(models.Model):
name = models.CharField(max_length=100, unique=True)
status = models.CharField(max_length=20, default='Processing')
def __str__(self):
return self.name
admin.py
from django.contrib import admin
from scorecards.models import Scorecard
from scorecards.tasks import get_report
class ScorecardAdmin(admin.ModelAdmin):
list_display = ['name', 'status']
def save_model(self, request, obj, form, change):
if form.is_valid():
data = form.cleaned_data
name = data['name']
get_report.delay(name)
super().save_model(request, obj, form, change)
admin.site.register(Scorecard, ScorecardAdmin)
tasks.py
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from time import sleep
@shared_task
def get_report(name):
sleep(30)
芹菜任务的实时状态每隔x个时间间隔更新一次 status
字段会很好,但是到目前为止,我真的很好奇该怎么做.
Real-time status of the celery task updating the status
field every x time intervals would be nice, but for now I'm just really curious how to do this at all.
推荐答案
我还没有弄清实时状态(但是),但是一旦任务完成,我确实设法更改了状态.
I didn't figure out real-time status (yet) but did manage to change the status once a task is completed.
这些是下面的主要部分.理解为什么这样做的关键原因是我要像这样在-pool = solo
中启动一个芹菜工人:
These were the main parts below. The critical reason to understand why this works is that I'm starting a celery worker in --pool=solo
like so:
celery -A scorecard worker --pool=solo -l info
这是一个单线程执行池(对于我当前的用途而言很好),但这意味着它将处理第一个任务 get_report(name)
,并在完成后处理 set_task_status(id)
,在其中检查结果的状态并将状态字段设置为实际状态.
This is a single-threaded execution pool (which is fine for my current purposes) but it means it will process the first task get_report(name)
, and when that's done, process the set_task_status(id)
where it checks the status of the result and sets the status field to be whatever the actual status is.
models.py
models.py
class Scorecard(models.Model):
....
task_id = models.CharField(max_length=50)
admin.py
class ScorecardAdmin(admin.ModelAdmin):
...
result = get_report.delay(name)
set_task_status.delay(result.task_id)
...
tasks.py
@shared_task
def get_report(name):
sleep(30)
@shared_task
def set_task_status(id):
instance = Scorecard.objects.get(task_id=id)
task_status = AsyncResult(id)
instance.status = task_status.status
instance.save()
这是我到目前为止所发现的.
This is what I've figured out so far.
这篇关于根据Celery任务状态更新Django模型字段的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!