芹菜停工后的特定任务 [英] celery shutdown worker after particular task

查看:53
本文介绍了芹菜停工后的特定任务的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用celery(并发性为1的独占池),并且我希望能够在执行特定任务后关闭工作程序.需要警告的是,我要避免工人在接任任何其他任务后再承担任何可能性.

I'm using celery (solo pool with concurrency=1) and I want to be able to shut down the worker after a particular task has run. A caveat is that I want to avoid any possibility of the worker picking up any further tasks after that one.

这是我在大纲中的尝试:

Here's my attempt in the outline:

from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun

app = Celery()
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y

@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
    raise WorkerShutdown()

但是,当我运行工人时

celery -A celeryapp  worker --concurrency=1 --pool=solo

并运行任务

add.delay(1,4)

我得到以下信息:

 -------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** ----- 
--- * ***  * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         __main__:0x7f596896ce90
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)

该任务被重新排队,将在另一个工作线程上再次运行,从而导致循环.

The task is re-queued and will be run again on another worker, leading to a loop.

当我在任务本身内移动 WorkerShutdown 异常时,也会发生这种情况.

This also happens when I move the WorkerShutdown exception within the task itself.

@app.task
def add(x, y):
    print(x + y)
    raise WorkerShutdown()

是否有办法在完成特定任务后关闭工作人员,同时避免这种不幸的副作用?

Is there a way I can shut down the worker after a particular task, while avoiding this unfortunate side-effect?

推荐答案

建议关闭工作进程的过程是发送 TERM 信号.这将导致芹菜工作者在完成任何当前正在运行的任务后关闭.如果将 QUIT 信号发送到工作程序的主进程,则工作程序将立即关闭.

The recommended process for shutting down a worker is to send the TERM signal. This will cause a celery worker to shutdown after completing any currently running tasks. If you send a QUIT signal to the worker's main process, the worker will shutdown immediately.

但是,celery文档通常从命令行或通过systemd/initd管理celery方面进行讨论,但是celery还通过 celery.app.control 提供了一个远程工作者控制API.
您可以撤消关闭也是这种方式的工人.

The celery docs, however, usually discuss this in terms of managing celery from a command line or via systemd/initd, but celery additionally provides a remote worker control API via celery.app.control.
You can revoke a task to prevent workers from executing the task. This should prevent the loop you are experiencing. Further, control supports shutdown of a worker in this manner as well.

因此,我想以下内容将使您获得想要的行为.

So I imagine the following will get you the behavior you desire.

@app.task(bind=True)
def shutdown(self):
    app.control.revoke(self.id) # prevent this task from being executed again
    app.control.shutdown() # send shutdown signal to all workers

由于当前无法从任务中确认任务,然后继续执行该任务,因此使用 revoke 的这种方法可以避免此问题,因此,即使再次将任务排队,新员工只会忽略它.

Since it's not currently possible to ack the task from within the task, then continue executing said task, this method of using revoke circumvents this problem so that, even if the task is queued again, the new worker will simply ignore it.

或者,以下内容也将阻止重新交付的任务再次执行...

Alternatively, the following would also prevent a redelivered task from being executed a second time...

@app.task(bind=True)
def some_task(self):
    if self.request.delivery_info['redelivered']:
        raise Ignore() # ignore if this task was redelivered
    print('This should only execute on first receipt of task')

也值得注意 AsyncResult 还有一个 revoke 方法,可为您调用 self.app.control.revoke .

Also worth noting AsyncResult also has a revoke method that calls self.app.control.revoke for you.

这篇关于芹菜停工后的特定任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆