使用celery任务中的多处理池会引发异常 [英] Using multiprocessing pool from celery task raises exception
问题描述
有关此内容的详细阅读:我决定使用RQ代替,当运行使用多处理模块的代码时,RQ不会失败.我建议您使用它.
FOR THOSE READING THIS: I have decided to use RQ instead which doesn't fail when running code that uses the multiprocessing module. I suggest you use that.
我正在尝试使用python 3和redis作为代理在celery任务中使用多处理池(在Mac上运行它).但是,我似乎甚至无法在Celery任务中创建多处理池对象!取而代之的是,我得到了一个奇怪的例外,我真的不知道该怎么办.
I am trying to use a multiprocessing pool from within a celery task using Python 3 and redis as the broker (running it on a Mac). However, I don't seem to be able to even create a multiprocessing Pool object from within the Celery task! Instead, I get a strange exception that I really don't know what to do with.
谁能告诉我如何做到这一点?
Can anyone tell me how to accomplish this?
任务:
from celery import Celery
from multiprocessing.pool import Pool
app = Celery('tasks', backend='redis', broker='redis://localhost:6379/0')
@app.task
def test_pool():
with Pool() as pool:
# perform some task using the pool
pool.close()
return 'Done!'
我使用以下方法将其添加到Celery:
which I add to Celery using:
celery -A tasks worker --loglevel=info
,然后通过以下python脚本运行它:
and then running it via the following python script:
import tasks
tasks.test_pool.delay()
返回以下芹菜输出:
[2015-01-12 15:08:57,571: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-01-12 15:08:57,583: INFO/MainProcess] mingle: searching for neighbors
[2015-01-12 15:08:58,588: INFO/MainProcess] mingle: all alone
[2015-01-12 15:08:58,598: WARNING/MainProcess] celery@Simons-MacBook-Pro.local ready.
[2015-01-12 15:09:02,425: INFO/MainProcess] Received task: tasks.test_pool[38cab553-3a01-4512-8f94-174743b05369]
[2015-01-12 15:09:02,436: ERROR/MainProcess] Task tasks.test_pool[38cab553-3a01-4512-8f94-174743b05369] raised unexpected: AttributeError("'Worker' object has no attribute '_config'",)
Traceback (most recent call last):
File "/usr/local/lib/python3.4/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.4/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/Users/simongray/Code/etilbudsavis/offer-sniffer/tasks.py", line 17, in test_pool
with Pool() as pool:
File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 150, in __init__
self._setup_queues()
File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 243, in _setup_queues
self._inqueue = self._ctx.SimpleQueue()
File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__
self._rlock = ctx.Lock()
File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock
return Lock(ctx=self.get_context())
File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 163, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 59, in __init__
kind, value, maxvalue, self._make_name(),
File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 117, in _make_name
return '%s-%s' % (process.current_process()._config['semprefix'],
AttributeError: 'Worker' object has no attribute '_config'
推荐答案
这是已知问题芹菜.它源于台球依赖性中引入的一个问题.解决方法是为当前进程手动设置_config
属性.感谢用户@martinth提供以下解决方法.
This is a known issue with celery. It stems from an issue introduced in the billiard dependency. A work-around is to manually set the _config
attribute for the current process. Thanks to user @martinth for the work-around below.
from celery.signals import worker_process_init
from multiprocessing import current_process
@worker_process_init.connect
def fix_multiprocessing(**kwargs):
try:
current_process()._config
except AttributeError:
current_process()._config = {'semprefix': '/mp'}
worker_process_init
挂钩将在工作进程初始化时执行代码.我们只是检查_config
是否存在,如果不存在,则进行设置.
The worker_process_init
hook will execute the code upon worker process initialization. We simply check to see if _config
exists, and set it if it does not.
这篇关于使用celery任务中的多处理池会引发异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!