Python多处理作业到Celery任务,但AttributeError [英] Python multiprocessing job to Celery task but AttributeError
问题描述
我做了这样的多重处理功能,
import multiprocessing
import pandas as pd
import numpy as np
def _apply_df(args):
df,func,kwargs = args
return df.apply(func,** kwargs)
def apply_by_multiprocessing(df,func,** kwargs):
workers = kwargs.pop('workers')
pool = multiprocessing.Pool(processes = workers)
result = pool.map (_apply_df,[(d,func,kwargs)
在np.array_split(df,workers)] d)
pool.close()
返回pd.concat(list(result) )
def square(x):
return x ** x
如果__name__ =='__main__':
df = pd.DataFrame {'a':range(10),'b':range(10)})
apply_by_multiprocessing(df,square,axis = 1,workers = 4)
##由4个处理器运行
以上apply_by_multiprocessing可以并行执行熊猫数据框。但是当我把它作为Celery的任务时,它引发了AssertionError:'Worker'对象没有属性'_config'。
导入shared_task
@shared_task
def my_multiple_job():
df = pd.DataFrame({'a':range(10),'b':range(10)} )
apply_by_multiprocessing(df,square,axis = 1,workers = 4)
这是错误跟踪就是这样,
文件/Users/yong27/work/goldstar/kinmatch/utils.py,第14行,在apply_by_multiprocessing
pre>
pool = multiprocessing.Pool(processes = workers)
文件/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4 /multiprocessing/context.py,第118行,池
context = self.get_context())
文件/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/版本/ 3.4 / lib / python3.4 / multiprocessing / pool.py,第146行,__init__
self._setup_queues()
文件/ usr / local / Cellar / python3 /3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py,第238行,在_setup_queues
self._inqueue = self._ctx.SimpleQueue()
文件/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py,第111行,SimpleQueue
返回SimpleQueue(ctx = self.get_context())
文件/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py ,第336行,__init__
self._rlock = ctx.Lock()
文件/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib /python3.4/multiprocessing/context.py,第66行,锁
return Lock(ctx = self.get_context())
文件/usr/local/Cellar/python3/3.4.0 /Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py,第164行,__init__
SemLock .__ init __(self,SEMAPHORE,1,ctx = ctx)
文件/usr/local/Cellar/python3/3.4 .0 / Frameworks / Python.framework / Versions / 3.4 / lib / python3.4 / multiprocessing / synchronize.py,第60行,__init__
kind,value,maxvalue,self._make_name(),
文件/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py,第118行,在_make_name
return' %s-%s'%(process.current_process()._ config ['semprefix'],
AttributeError:'Worker'对象没有属性'_config'
似乎因为芹菜工作者不是一个正常的过程。如何解决这个问题?我使用的是Python3.4,Django 1.6.2,芹菜3.1.10,django-celery 3.1.9,pandas 0.12.0。
解决方案此问题有一个很好的答案在这个其他问题
基本上,这是一个已知问题的芹菜和一个肮脏的黑客提供:它适用于我,我只是在同一个文件中添加了以下代码,其中定义了我的任务: p>
from celery.signals import worker_process_init
from multiprocessing import current_process
@ worker_process_init.connect
def fix_multiprocessing(** kwargs):
try:
current_process()._ config
除了AttributeError:
current_process()._ config = {'semprefix':'/ mp '}
I made a multiprocessed function like this,
import multiprocessing import pandas as pd import numpy as np def _apply_df(args): df, func, kwargs = args return df.apply(func, **kwargs) def apply_by_multiprocessing(df, func, **kwargs): workers = kwargs.pop('workers') pool = multiprocessing.Pool(processes=workers) result = pool.map(_apply_df, [(d, func, kwargs) for d in np.array_split(df, workers)]) pool.close() return pd.concat(list(result)) def square(x): return x**x if __name__ == '__main__': df = pd.DataFrame({'a':range(10), 'b':range(10)}) apply_by_multiprocessing(df, square, axis=1, workers=4) ## run by 4 processors
Above "apply_by_multiprocessing" can execute Pandas Dataframe apply in parallel. But when I make it to Celery task, It raised AssertionError: 'Worker' object has no attribute '_config'.
from celery import shared_task @shared_task def my_multiple_job(): df = pd.DataFrame({'a':range(10), 'b':range(10)}) apply_by_multiprocessing(df, square, axis=1, workers=4)
It's error trace is like this,
File "/Users/yong27/work/goldstar/kinmatch/utils.py", line 14, in apply_by_multiprocessing pool = multiprocessing.Pool(processes=workers) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 118, in Pool context=self.get_context()) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 146, in __init__ self._setup_queues() File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 238, in _setup_queues self._inqueue = self._ctx.SimpleQueue() File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue return SimpleQueue(ctx=self.get_context()) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__ self._rlock = ctx.Lock() File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock return Lock(ctx=self.get_context()) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 164, in __init__ SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 60, in __init__ kind, value, maxvalue, self._make_name(), File "/usr/local/Cellar/python3/3.4.0/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 118, in _make_name return '%s-%s' % (process.current_process()._config['semprefix'], AttributeError: 'Worker' object has no attribute '_config'
It seems that because Celery worker is not a normal process. How can I solve this problem? I'm using Python3.4, Django 1.6.2, celery 3.1.10, django-celery 3.1.9, pandas 0.12.0.
解决方案This issue has a good answer in this other question
Basically, it is a known issue of Celery and a dirty hack is provided: it worked for me, I just added the following code in the same file where my tasks are defined:
from celery.signals import worker_process_init from multiprocessing import current_process @worker_process_init.connect def fix_multiprocessing(**kwargs): try: current_process()._config except AttributeError: current_process()._config = {'semprefix': '/mp'}
这篇关于Python多处理作业到Celery任务,但AttributeError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文