celery通过在task_postrun信号中提高SystemExit来尝试关闭工作者,但始终挂起并且主进程永不退出 [英] celery trying shutdown worker by raising SystemExit in task_postrun signal but always hangs and the main process never exits

查看:44
本文介绍了celery通过在task_postrun信号中提高SystemExit来尝试关闭工作者,但始终挂起并且主进程永不退出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试通过task_postrun信号中的葡萄干SystemExit()关闭主要芹菜过程.信号被很好地触发,并且引发了异常,但是工作人员从未完全退出而只是挂在那儿.

I'm trying to shutdown the main celery process by raisin SystemExit() in the task_postrun signal. The signal gets fired just fine, and the exception gets raised, but the worker never completely exits and just hangs there.

我该如何做?

我忘了某个地方的设置吗?

Am I forgetting some setting somewhere?

下面是我为工作人员使用的代码(worker.py):

Below is the code that I'm using for the worker (worker.py):

from celery import Celery
from celery import signals

app = Celery('tasks',
    set_as_current = True,
    broker='amqp://guest@localhost//',
    backend="mongodb://localhost//",
)

app.config_from_object({
    "CELERYD_MAX_TASKS_PER_CHILD": 1,
    "CELERYD_POOL": "solo",
    "CELERY_SEND_EVENTS": True,
    "CELERYD_CONCURRENCY": 1,
    "CELERYD_PREFETCH_MULTIPLIER": 1,
})

def shutdown_worker(**kwargs):
    print("SHUTTING DOWN WORKER (RAISING SystemExit)")
    raise SystemExit()

import tasks

signals.task_postrun.connect(shutdown_worker)

print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")

及以下是task.py的代码:

and below is the code for tasks.py:

from celery import Celery,task
from celery.signals import task_postrun
import time

from celery.task import Task

class Blah(Task):
    track_started = True
    acks_late = False

    def run(config, kwargs):
        time.sleep(5)
        return "SUCCESS FROM BLAH"

def get_app():
    celery = Celery('tasks',
        broker='amqp://guest@localhost//',
        backend="mongodb://localhost//"
    )
    return celery

要对此进行测试,我要做的第一件事是运行工作程序代码( python worker.py ),然后将这样的任务排队:

To test this, first thing I do is run the worker code (python worker.py), then I queue up a task like so:

>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))

我从工作人员看到的输出是这样:

The output I see from the worker is this:

STARTING WORKER

 -------------- celery@hostname v3.0.9 (Chiastic Slide)
---- **** ----- 
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events:      ON
- ** ---------- 
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** ----- 

[2012-11-06 15:59:16,761: WARNING/MainProcess] celery@hostname has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)

我期望python代码从调用 app.worker_main()返回,然后打印 WORKER EXITED ,然后使该过程完全退出.这永远不会发生,并且必须对工作进程进行 kill -KILL {PID} 才能消失(它也无法消耗任何其他任务.

I was expecting the python code to return from the call to app.worker_main() and then to print WORKER EXITED and then for the process to exit completely. This never happens, and the worker process has to be kill -KILL {PID}ed for it to go away (it's not able to consume any other tasks either.

我想我的主要问题是:

我如何从 app.worker_main()返回代码?

我希望能够在完成 X 个任务后完全重启工作进程(通过退出进程 COMPLETELY )已执行.

I'd like to be able to completely restart the worker process (by having the process COMPLETELY exit) after X number of tasks have been executed.

更新 我弄清楚了工作程序挂在了什么上-工作程序( WorkController )挂在了对 self的调用上.stop 捕获到 SystemExit 异常后.

UPDATE I figured out what the worker is hanging on - the worker (WorkController) is hanging on a call to self.stop after it catches the SystemExit exception.

推荐答案

在我最终回答自己的问题时讨厌它.

Hate it when I end up answering my own question.

Anywhoo,它阻止了在 WorkController 内部的 Mediator 组件上的联接调用(在上调用 stop()调解器组件,在停靠点内部,它 join s).

Anywhoo, it was blocking on a join call on the Mediator component inside the WorkController (calls stop() on the Mediator component, inside stop, it joins).

我通过禁用所有速率限制来摆脱了 Mediator 组件(默认情况下应该是这个速率,但这不是出于某种原因).

I got rid of the Mediator component by disabling all rate limits (should be this by default, but it's not for some reason).

您可以使用以下设置禁用所有速率限制:

You can disable all rate limits with the setting:

CELERY_DISABLE_ALL_RATE_LIMITS: True

希望这也可以帮助其他人.

Hope this helps somebody else down the road too.

和平

这篇关于celery通过在task_postrun信号中提高SystemExit来尝试关闭工作者,但始终挂起并且主进程永不退出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆