使用add_periodic_task在Celery(celerybeat)中动态设置定期任务 [英] Setting up periodic tasks in Celery (celerybeat) dynamically using add_periodic_task

查看:627
本文介绍了使用add_periodic_task在Celery(celerybeat)中动态设置定期任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我将 Celery 4.0.1 Django 1.10 一起使用,我在计划任务时遇到麻烦(运行任务工作良好)。这是芹菜配置:

  os.environ.setdefault('DJANGO_SETTINGS_MODULE','myapp.settings')
app = Celery('myapp')

app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)

app.conf.BROKER_URL ='amqp:// {}:{} @ {}'。format(settings.AMQP_USER,settings.AMQP_PASSWORD,settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE ='myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE ='myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER ='json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT =真
app.conf.CELERY_DISABLE_RATE_LIMITS =真
app.conf.BROKER_POOL_LIMIT = 2

app.conf.CELERY_QUEUES =(
Queue('myapp.celery_default'),
Queue('myapp.queue1') ,
Queue('myapp.queue2'),
Queue('myapp.queue3'),

然后在tasks.py中有

  @ app.task(queue = 'myapp.queue1')
def my_task(some_id ):
print( Doing with,some_id)

在views.py中想要安排此任务:

  def my_view(request,id):
app.add_periodic_task(10,my_task。 s(id))

然后我执行命令:

  sudo systemctl start rabbitmq.service 
celery -A myapp.celery_app beat -l调试
celery worker -A myapp.celery_app

但是任务从未计划。我在日志中看不到任何东西。该任务之所以有效,是因为如果我认为这样做:

  def my_view(request,id):
my_task。 delay(id)

任务已执行。



如果在我的配置文件中,如果我手动安排任务,则可以这样工作:

  app.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds':{
'task':'tasks.my_task',
'schedule':10.0,
'args':(66 ,)
},
}

我只是无法安排任务动态地。知道吗?

解决方案

编辑:(13/01/2018)



< blockquote>

最新的版本4.1.0 已在此机票#3958 中解决了该主题,并已合并 em>







实际上,您不能在视图级别定义定期任务,因为节拍时间表设置将首先加载,并且无法在运行时重新安排:


add_periodic_task() 函数会将条目添加到幕后的beat_schedule设置中,并且相同的设置也可以用于手动设置定期任务:

  app.con f.CELERYBEAT_SCHEDULE = {
'每隔30秒添加一次:: {
'task':'tasks.my_task',
'schedule':10.0,
'args ':(66,)
},
}


表示如果要使用 add_periodic_task(),则应将其包装在 on_after_configure 处理程序中celery应用程序级别以及运行时的任何修改都不会生效:

  app = Celery()

@ app.on_after_configure.connect
def setup_periodic_tasks(sender,** kwargs):
sender.add_periodic_task(10,my_task.s(66))

doc ,常规celerybeat只需跟踪任务执行即可:


默认的调度程序是 celery.beat.PersistentScheduler ,它只是跟踪上一个


为了能够动态管理定期任务并在运行时重新安排celerybeat,

p>


还有 django-celery-beat 扩展程序,用于将日程表存储在Django数据库中,并提供一个方便的管理界面在运行时管理定期任务


任务将保存在django数据库中,并且调度程序可以在数据库级别的任务模型中进行更新。每当您更新定期任务时,此任务表中的计数器都会增加,并告诉celery beat服务从数据库中重新加载时间表。



一个可能的解决方案可能如下所示:

 来自django_celery_beat.models import PeriodicTask,IntervalSchedule 

schedule = IntervalSchedule.objects .create(每个= 10,period = IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval = schedule,name ='any name',task ='tasks.my_task',args = json.dumps( [66]))

views.py

  def update_task_view(request,id)
task = PeriodicTask.objects.get(name =任务名称)#如果我们假设名称是唯一的
task.args = json.dumps([id])
task.save()


I'm using Celery 4.0.1 with Django 1.10 and I have troubles scheduling tasks (running a task works fine). Here is the celery configuration:

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

app.conf.BROKER_URL = 'amqp://{}:{}@{}'.format(settings.AMQP_USER, settings.AMQP_PASSWORD, settings.AMQP_HOST)
app.conf.CELERY_DEFAULT_EXCHANGE = 'myapp.celery'
app.conf.CELERY_DEFAULT_QUEUE = 'myapp.celery_default'
app.conf.CELERY_TASK_SERIALIZER = 'json'
app.conf.CELERY_ACCEPT_CONTENT = ['json']
app.conf.CELERY_IGNORE_RESULT = True
app.conf.CELERY_DISABLE_RATE_LIMITS = True
app.conf.BROKER_POOL_LIMIT = 2

app.conf.CELERY_QUEUES = (
    Queue('myapp.celery_default'),
    Queue('myapp.queue1'),
    Queue('myapp.queue2'),
    Queue('myapp.queue3'),
)

Then in tasks.py I have:

@app.task(queue='myapp.queue1')
def my_task(some_id):
    print("Doing something with", some_id)

In views.py I want to schedule this task:

def my_view(request, id):
    app.add_periodic_task(10, my_task.s(id))

Then I execute the commands:

sudo systemctl start rabbitmq.service
celery -A myapp.celery_app beat -l debug
celery worker -A myapp.celery_app

But the task is never scheduled. I don't see anything in the logs. The task is working because if in my view I do:

def my_view(request, id):
    my_task.delay(id)

The task is executed.

If in my configuration file if I schedule the task manually, like this it works:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}

I just can't schedule the task dynamically. Any idea?

解决方案

EDIT: (13/01/2018)

The latest release 4.1.0 have addressed the subject in this ticket #3958 and has been merged


Actually you can't not define periodic task at the view level, because the beat schedule setting will be loaded first and can not be rescheduled at runtime:

The add_periodic_task() function will add the entry to the beat_schedule setting behind the scenes, and the same setting can also can be used to set up periodic tasks manually:

app.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.my_task',
        'schedule': 10.0,
        'args': (66,)
    },
}

which means if you want to use add_periodic_task() it should be wrapped within an on_after_configure handler at the celery app level and any modification on runtime will not take effect:

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(10, my_task.s(66))

As mentioned in the doc the the regular celerybeat simply keep track of task execution:

The default scheduler is the celery.beat.PersistentScheduler, that simply keeps track of the last run times in a local shelve database file.

In order to be able to dynamically manage periodic tasks and reschedule celerybeat at runtime:

There’s also the django-celery-beat extension that stores the schedule in the Django database, and presents a convenient admin interface to manage periodic tasks at runtime.

The tasks will be persisted in django database and the scheduler could be updated in task model at the db level. Whenever you update a periodic task a counter in this tasks table will be incremented, and tells the celery beat service to reload the schedule from the database.

A possible solution for you could be as follow:

from django_celery_beat.models import PeriodicTask, IntervalSchedule

schedule= IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)
task = PeriodicTask.objects.create(interval=schedule, name='any name', task='tasks.my_task', args=json.dumps([66]))

views.py

def update_task_view(request, id)
    task = PeriodicTask.objects.get(name="task name") # if we suppose names are unique
    task.args=json.dumps([id])
    task.save()

这篇关于使用add_periodic_task在Celery(celerybeat)中动态设置定期任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆