Celery Beat:一次限制为单个任务实例 [英] Celery Beat: Limit to single task instance at a time

查看:2556
本文介绍了Celery Beat:一次限制为单个任务实例的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有芹菜和芹菜(四个工人)批量做一些处理步骤。其中一个任务大致沿着对于没有创建Y的每个X,创建一个Y。



任务周期性地运行半速率(10sec)。任务完成非常快。还有其他任务也一样。



我遇到过这个问题多次,其中心跳任务显然变成积压,所以相同的任务(从不同的节拍时间),导致不正确的重复工作。


  1. 可以限制芹菜节拍以确保只有一个一个任务的优秀实例?在任务上设置类似 rate_limit = 5 的东西,这样做的正确方式?


  2. 可以确保节拍任务按顺序执行,例如


  3. 这是什么是最好的处理方式,而不是让这些任务本身执行原子操作,而是安全地同时执行?这不是我预期的打击任务的限制...


任务本身天真地定义:

  @periodic_task(run_every = timedelta(seconds = 10))
def add_y_to_xs():
#在数据库中做事情
return

这是一个实际的p>


  • [00:00.000] foocorp.tasks.add_y_to_xs已发送。 id - >#1

  • [00:00.001] 已接收任务:foocorp.tasks.add_y_to_xs [#1]

  • [00:10.009] 发送foocorp.tasks.add_y_to_xs。 id - >#2

  • [00:20.024] foocorp.tasks.add_y_to_xs已发送。已接收任务:foocorp.tasks.add_y_to_xs [#2]

  • [00:26.748] TaskPool:Apply#2

  • 26.752] 接收的任务:foocorp.tasks.add_y_to_xs [#3]

  • [00:26.769] 接受的任务:foocorp.tasks.add_y_to_xs [#2] pid:26528

  • [00:26.775] 任务foocorp.tasks.add_y_to_xs [#2] succeeded in 0.0197986490093s:None

  • [00:26.806] TaskPool:Apply#1

  • [00:26.836] TaskPool:Apply#3

  • [01:30.020] 接受的任务:foocorp.tasks.add_y_to_xs [#1] pid:26526

  • [01:30.053] Task accepted:foocorp.tasks.add_y_to_xs [#3] pid:26529

  • [01:30.055] foocorp.tasks.add_y_to_xs [#1]:为X的ID#9725添加Y.

  • [01:30.070] foocorp.tasks.add_y_to_xs [#3] :为X添加Y <#9725

  • [01:30.074] 任务foocorp.tasks.add_y_to_xs [#1]成功了0.0594762689434 s:无

  • [01:30.087] Task foocorp.tasks.add_y_to_xs [#3] succeeded in 0.0352867960464s: li>


我们目前使用Celery 3.1.4和RabbitMQ作为传输。



EDIT Dan,这是我想出的:



Dan,这里是我最后使用:



从sqlalchemy导入
导入DBAPIError
从上下文导入上下文管理器

b
$ b def _psql_advisory_lock_blocking(conn,lock_id,shared,timeout):
lock_fn =(func.pg_advisory_xact_lock_shared
如果共享else
func.pg_advisory_xact_lock)
如果超时:
conn.execute(text('SET statement_timeout TO:timeout'),
timeout = timeout)
try:
conn.execute(select([lock_fn(lock_id)] ))
除了DBAPIError:
return False
return True


def _psql_advisory_lock_nonblocking(conn,lock_id,shared):
lock_fn = func.pg_try_advisory_xact_lock_shared
如果共享else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)]))scalar()

b $ b class DatabaseLockFailed(Exception):
pass


@contextmanager
def db_lock(engine,name,shared = False,block = True,timeout = None ):

上下文管理器获取一个带有
指定名称的PSQL通知事务锁。

lock_id = hash(name)

with engine.begin()as conn,conn.begin():
if block:
lock = _psql_advisory_lock_blocking(conn,lock_id,shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn,lock_id,shared)
如果未锁定:
raise DatabaseLockFailed()
yield

和celery任务装饰器(仅用于周期性任务):

  from functools import wraps 
from preo.extensions import db


def locked(name = None,block = True,timeout ='1s'):

使用PostgreSQL咨询事务锁,只有
锁定。否则记录消息并返回`None`。

def with_task(fn):
lock_id = name或'celery:{}。{}'。格式(fn .__ module__,fn .__ name__)

@wraps(fn)
def f(* args,** kwargs):
try:
with db_lock(db.engine,name = lock_id,block = block,
timeout = timeout):
return fn(* args,** kwargs)
,除了DatabaseLockFailed:
logger.error('Failed to get lock。
return f
return with_task


解决方案

执行此操作的唯一方法是自己实施锁定策略



阅读


与cron一样,如果第一个任务不是
,任务可能会重叠完成前下一个。如果这是一个关注,你应该使用
锁定策略,以确保一次只能运行一个实例(请参阅
例如确保一个任务一次只能执行一个)。



I have celery beat and celery (four workers) to do some processing steps in bulk. One of those tasks is roughly along the lines of, "for each X that hasn't had a Y created, create a Y."

The task is run periodically at a semi-rapid rate (10sec). The task completes very quickly. There are other tasks going on as well.

I've run into the issue multiple times in which the beat tasks apparently become backlogged, and so the same task (from different beat times) are executed simultaneously, causing incorrectly duplicated work. It also appears that the tasks are executed out-of-order.

  1. Is it possible to limit celery beat to ensure only one outstanding instance of a task at a time? Is setting something like rate_limit=5 on the task the "correct" way to of doing this?

  2. Is it possible to ensure that beat tasks are executed in-order, e.g. instead of dispatching a task, beat adds it to a task chain?

  3. What's the best way of handling this, short of making those tasks themselves execute atomically and are safe to be executed concurrently? That was not a restriction I would have expected of beat tasks…

The task itself is defined naïvely:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return

Here's an actual (cleaned) log:

  • [00:00.000] foocorp.tasks.add_y_to_xs sent. id->#1
  • [00:00.001] Received task: foocorp.tasks.add_y_to_xs[#1]
  • [00:10.009] foocorp.tasks.add_y_to_xs sent. id->#2
  • [00:20.024] foocorp.tasks.add_y_to_xs sent. id->#3
  • [00:26.747] Received task: foocorp.tasks.add_y_to_xs[#2]
  • [00:26.748] TaskPool: Apply #2
  • [00:26.752] Received task: foocorp.tasks.add_y_to_xs[#3]
  • [00:26.769] Task accepted: foocorp.tasks.add_y_to_xs[#2] pid:26528
  • [00:26.775] Task foocorp.tasks.add_y_to_xs[#2] succeeded in 0.0197986490093s: None
  • [00:26.806] TaskPool: Apply #1
  • [00:26.836] TaskPool: Apply #3
  • [01:30.020] Task accepted: foocorp.tasks.add_y_to_xs[#1] pid:26526
  • [01:30.053] Task accepted: foocorp.tasks.add_y_to_xs[#3] pid:26529
  • [01:30.055] foocorp.tasks.add_y_to_xs[#1]: Adding Y for X id #9725
  • [01:30.070] foocorp.tasks.add_y_to_xs[#3]: Adding Y for X id #9725
  • [01:30.074] Task foocorp.tasks.add_y_to_xs[#1] succeeded in 0.0594762689434s: None
  • [01:30.087] Task foocorp.tasks.add_y_to_xs[#3] succeeded in 0.0352867960464s: None

We're currently using Celery 3.1.4 with RabbitMQ as the transport.

EDIT Dan, here's what I came up with:

Dan, here's what I ended up using:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager


def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
    lock_fn = (func.pg_advisory_xact_lock_shared
               if shared else
               func.pg_advisory_xact_lock)
    if timeout:
        conn.execute(text('SET statement_timeout TO :timeout'),
                     timeout=timeout)
    try:
        conn.execute(select([lock_fn(lock_id)]))
    except DBAPIError:
        return False
    return True


def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
    lock_fn = (func.pg_try_advisory_xact_lock_shared
               if shared else
               func.pg_try_advisory_xact_lock)
    return conn.execute(select([lock_fn(lock_id)])).scalar()


class DatabaseLockFailed(Exception):
    pass


@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
    """
    Context manager which acquires a PSQL advisory transaction lock with a
    specified name.
    """
    lock_id = hash(name)

    with engine.begin() as conn, conn.begin():
        if block:
            locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
                                                  timeout)
        else:
            locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
        if not locked:
            raise DatabaseLockFailed()
        yield

And the celery task decorator (used only for periodic tasks):

from functools import wraps
from preo.extensions import db


def locked(name=None, block=True, timeout='1s'):
    """
    Using a PostgreSQL advisory transaction lock, only runs this task if the
    lock is available. Otherwise logs a message and returns `None`.
    """
    def with_task(fn):
        lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)

        @wraps(fn)
        def f(*args, **kwargs):
            try:
                with db_lock(db.engine, name=lock_id, block=block,
                             timeout=timeout):
                    return fn(*args, **kwargs)
            except DatabaseLockFailed:
                logger.error('Failed to get lock.')
                return None
        return f
    return with_task

解决方案

The only way to do this is implementing a locking strategy yourself:

Read under the section here for the reference.

Like with cron, the tasks may overlap if the first task does not complete before the next. If that is a concern you should use a locking strategy to ensure only one instance can run at a time (see for example Ensuring a task is only executed one at a time).

这篇关于Celery Beat:一次限制为单个任务实例的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆