芹菜停工后的特定任务 [英] celery shutdown worker after particular task
问题描述
我正在使用celery(并发性为1的独占池),并且我希望能够在执行特定任务后关闭工作程序.需要警告的是,我要避免工人在接任任何其他任务后再承担任何可能性.
I'm using celery (solo pool with concurrency=1) and I want to be able to shut down the worker after a particular task has run. A caveat is that I want to avoid any possibility of the worker picking up any further tasks after that one.
这是我在大纲中的尝试:
Here's my attempt in the outline:
from __future__ import absolute_import, unicode_literals
from celery import Celery
from celery.exceptions import WorkerShutdown
from celery.signals import task_postrun
app = Celery()
app.config_from_object('celeryconfig')
@app.task
def add(x, y):
return x + y
@task_postrun.connect(sender=add)
def shutdown(*args, **kwargs):
raise WorkerShutdown()
但是,当我运行工人时
celery -A celeryapp worker --concurrency=1 --pool=solo
并运行任务
add.delay(1,4)
我得到以下信息:
-------------- celery@sam-APOLLO-2000 v4.0.2 (latentcall)
---- **** -----
--- * *** * -- Linux-4.4.0-116-generic-x86_64-with-Ubuntu-16.04-xenial 2018-03-18 14:08:37
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: __main__:0x7f596896ce90
- ** ---------- .> transport: redis://localhost:6379/0
- ** ---------- .> results: redis://localhost/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[2018-03-18 14:08:39,892: WARNING/MainProcess] Restoring 1 unacknowledged message(s)
该任务被重新排队,将在另一个工作线程上再次运行,从而导致循环.
The task is re-queued and will be run again on another worker, leading to a loop.
当我在任务本身内移动 WorkerShutdown
异常时,也会发生这种情况.
This also happens when I move the WorkerShutdown
exception within the task itself.
@app.task
def add(x, y):
print(x + y)
raise WorkerShutdown()
是否有办法在完成特定任务后关闭工作人员,同时避免这种不幸的副作用?
Is there a way I can shut down the worker after a particular task, while avoiding this unfortunate side-effect?
推荐答案
建议关闭工作进程的过程是发送 TERM
信号.这将导致芹菜工作者在完成任何当前正在运行的任务后关闭.如果将 QUIT
信号发送到工作程序的主进程,则工作程序将立即关闭.
The recommended process for shutting down a worker is to send the TERM
signal. This will cause a celery worker to shutdown after completing any currently running tasks. If you send a QUIT
signal to the worker's main process, the worker will shutdown immediately.
但是,celery文档通常从命令行或通过systemd/initd管理celery方面进行讨论,但是celery还通过 celery.app.control
提供了一个远程工作者控制API.
您可以撤消一个任务,以防止工人执行任务.这样可以防止您遇到循环.此外,控件支持关闭也是这种方式的工人.
The celery docs, however, usually discuss this in terms of managing celery from a command line or via systemd/initd, but celery additionally provides a remote worker control API via celery.app.control
.
You can revoke a task to prevent workers from executing the task. This should prevent the loop you are experiencing. Further, control supports shutdown of a worker in this manner as well.
因此,我想以下内容将使您获得想要的行为.
So I imagine the following will get you the behavior you desire.
@app.task(bind=True)
def shutdown(self):
app.control.revoke(self.id) # prevent this task from being executed again
app.control.shutdown() # send shutdown signal to all workers
由于当前无法从任务中确认任务,然后继续执行该任务,因此使用 revoke
的这种方法可以避免此问题,因此,即使再次将任务排队,新员工只会忽略它.
Since it's not currently possible to ack the task from within the task, then continue executing said task, this method of using revoke
circumvents this problem so that, even if the task is queued again, the new worker will simply ignore it.
或者,以下内容也将阻止重新交付的任务再次执行...
Alternatively, the following would also prevent a redelivered task from being executed a second time...
@app.task(bind=True)
def some_task(self):
if self.request.delivery_info['redelivered']:
raise Ignore() # ignore if this task was redelivered
print('This should only execute on first receipt of task')
也值得注意 AsyncResult
还有一个 revoke
方法,可为您调用 self.app.control.revoke
.
Also worth noting AsyncResult
also has a revoke
method that calls self.app.control.revoke
for you.
这篇关于芹菜停工后的特定任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!