芹菜periodic_task并行运行多次 [英] Celery periodic_task running multiple times in parallel

查看:133
本文介绍了芹菜periodic_task并行运行多次的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些使用Celery线程的非常简单的定期代码;它只是打印 Pre和 Post,然后在它们之间睡眠。改编自此StackOverflow问题此链接的网站

 从celery.task导入任务
从celery.task导入周期任务
从django.core.cache导入缓存
从时间导入睡眠
导入主
导入cutout_score
来自线程导入锁定

来自套接字的导入
来自日期时间导入时间增量
来自celery.decorators导入任务,periodic_task

def single_instance_task(timeout) :
def task_exc(func):
def包装器(* args,** kwargs):
lock_id = celery-single-instance- + func .__ name__
acquisition_lock = lambda:cache.add(lock_id, true,超时)
release_lock = lambda:cache.delete(lock_id)
if gain_lock():
试试:
func() $ b最终$ b:
release_lock()
返回包装器
return task_exc

LOCK_EXPIRE = 60 * 5#锁定在5分钟后过期
@periodic_task(run_every = timedelta(seconds = 2))
def test():
lock_id = lock

#cache.add如果密钥已经存在则失败
acquisition_lock = lambda:cache.add(lock_id, true,LOCK_EXPIRE)
#memcache删除非常慢,但我们必须使用它来利用
#使用add()进行原子锁定$ b的优势$ b release_lock = lambda:cache.delete(lock_id)

如果acquire_lock():
试试:
print'pre'
sleep(20)
print'post'
最后:
release_lock()
返回
print'已经在使用中...'

此代码从不打印'已在使用中...';当我使用 @single_instance_task 装饰器时,也会发生相同的现象。



您知道怎么了吗?



编辑:我已经简化了问题,以便它不会写入内存(使用全局或django缓存);我仍然从未看到'已经在使用...'






编辑:当我将以下代码添加到Django settings.py文件中时(通过更改 https://docs.djangoproject.com/en/dev/topics/cache/ 一切都按预期工作,但仅当我使用端口11211时(很奇怪,我的服务器位于端口8000上)

  CACHES = {
'default' :{{
'BACKEND':'django.core.cache.backends.memcached.MemcachedCache',
'LOCATION':[
'127.0.0.1:11211'
]
}
}


解决方案



如果它正在运行多进程,则没有工作程序之间共享内存的全局变量。



如果您希望共享一个计数器所有工人,那么我建议您使用 cache.incr



例如:

 在[1]中:从django.core.cache导入缓存

在[2]中:cache.set('counter ',0)

在[3]中:cache.incr('counter')
Out [3]:1

在[4]:缓存中。 incr('counter')
Out [4]:2






更新



如果您通过睡眠强制任务重叠会发生什么,例如:

  print%r上的任务已开始%(self,)
sleep(20)
print%r上的任务已停止 %(自我)

如果您没有从运行中获得已在使用中...




另一次更新



您是否已在Django设置中设置了缓存后端?例如。 memcached



否则,您可能正在使用虚拟缓存实际上不执行任何缓存,只是实现了接口 ...听起来像是您的令人信服的原因问题。


I have some very simple periodic code using Celery's threading; it simply prints "Pre" and "Post" and sleep in between. It is adapted from this StackOverflow question and this linked website

from celery.task import task
from celery.task import periodic_task
from django.core.cache import cache
from time import sleep
import main
import cutout_score
from threading import Lock

import socket
from datetime import timedelta
from celery.decorators import task, periodic_task

def single_instance_task(timeout):
  def task_exc(func):
    def wrapper(*args, **kwargs):
        lock_id = "celery-single-instance-" + func.__name__
        acquire_lock = lambda: cache.add(lock_id, "true", timeout)
        release_lock = lambda: cache.delete(lock_id)
        if acquire_lock():
            try:
                func()
            finally:
                release_lock()
    return wrapper
  return task_exc

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
@periodic_task(run_every = timedelta(seconds=2))
def test():
    lock_id = "lock"

    # cache.add fails if if the key already exists
    acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: cache.delete(lock_id)

    if acquire_lock():
        try:
            print 'pre'
            sleep(20)
            print 'post'
        finally:
            release_lock()
        return
    print 'already in use...'

This code never prints 'already in use...'; the same phenomenon occurs when I use the @single_instance_task decorator.

Do you know what's wrong?

Edit: I've simplified the question so that it doesn't write to memory (using a global or the django cache); I still never see 'already in use...'


Edit: When I add the following code to my Django settings.py file (by changing the code from https://docs.djangoproject.com/en/dev/topics/cache/ everything works as hoped, but only when I use port 11211 (oddly enough, my server is on port 8000)

CACHES = {
    'default': {
        'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
        'LOCATION': [
            '127.0.0.1:11211'
        ]
    }
}

解决方案

How are you running celeryd? I'm not familiar with a threaded option.

If it's running multi-process then there are no "global" variables that are shared memory between workers.

If you want a counter shared between all workers, then I'd suggest you use cache.incr.

E.g.:

In [1]: from django.core.cache import cache

In [2]: cache.set('counter',0)

In [3]: cache.incr('counter')
Out[3]: 1

In [4]: cache.incr('counter')
Out[4]: 2


Update

What happens if you force your tasks to overlap by sleeping, e.g.:

print "Task on %r started" % (self,)
sleep(20)
print "Task on %r stopped" % (self,)

If you don't get "already in use..." from running this more frequently then 20 seconds then you know that the cache isn't behaving as expected.


Another Update

Have you set up a cache backend in your django settings? E.g. memcached

If not you may be using the Dummy Cache, which doesn't actually do any caching, just implements the interface... which is sounding like a convincing cause of your problem.

这篇关于芹菜periodic_task并行运行多次的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆