将defaultdict与多处理一起使用? [英] Using defaultdict with multiprocessing?
问题描述
只是进行试验和学习,我知道如何创建可以使用多个过程访问的共享字典,但是我不确定如何使字典保持同步.我相信defaultdict
可以说明我遇到的问题.
from collections import defaultdict
from multiprocessing import Pool, Manager, Process
#test without multiprocessing
s = 'mississippi'
d = defaultdict(int)
for k in s:
d[k] += 1
print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)]
print '*'*10, ' with multiprocessing ', '*'*10
def test(k, multi_dict):
multi_dict[k] += 1
if __name__ == '__main__':
pool = Pool(processes=4)
mgr = Manager()
multi_d = mgr.dict()
for k in s:
pool.apply_async(test, (k, multi_d))
# Mark pool as closed -- no more tasks can be added.
pool.close()
# Wait for tasks to exit
pool.join()
# Output results
print multi_d.items() #FAIL
print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10
def test2(k, multi_dict2):
multi_dict2[k] += 1
if __name__ == '__main__':
manager = Manager()
multi_d2 = manager.dict()
for k in s:
p = Process(target=test2, args=(k, multi_d2))
p.start()
p.join()
print multi_d2 #FAIL
第一个结果有效(因为它不使用multiprocessing
),但是我在使其与multiprocessing
一起工作时遇到了问题.我不确定如何解决它,但我认为可能是由于它未同步(并在以后加入结果),或者可能是因为在multiprocessing
中我无法弄清楚如何将defaultdict(int)
设置为字典.>
任何有关如何使它正常工作的帮助或建议都很棒!
您可以将BaseManager
子类化,并注册其他类型以进行共享.如果默认AutoProxy
生成的类型不起作用,则需要提供合适的代理类型.对于defaultdict
,如果只需要访问dict
中已经存在的属性,则可以使用DictProxy
.
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict
class MyManager(BaseManager):
pass
MyManager.register('defaultdict', defaultdict, DictProxy)
def test(k, multi_dict):
multi_dict[k] += 1
if __name__ == '__main__':
pool = Pool(processes=4)
mgr = MyManager()
mgr.start()
multi_d = mgr.defaultdict(int)
for k in 'mississippi':
pool.apply_async(test, (k, multi_d))
pool.close()
pool.join()
print multi_d.items()
Just experimenting and learning, and I know how to create a shared dictionary that can be accessed with multiple proceses but I'm not sure how to keep the dict synced. defaultdict
, I believe, illustrates the problem I'm having.
from collections import defaultdict
from multiprocessing import Pool, Manager, Process
#test without multiprocessing
s = 'mississippi'
d = defaultdict(int)
for k in s:
d[k] += 1
print d.items() # Success! result: [('i', 4), ('p', 2), ('s', 4), ('m', 1)]
print '*'*10, ' with multiprocessing ', '*'*10
def test(k, multi_dict):
multi_dict[k] += 1
if __name__ == '__main__':
pool = Pool(processes=4)
mgr = Manager()
multi_d = mgr.dict()
for k in s:
pool.apply_async(test, (k, multi_d))
# Mark pool as closed -- no more tasks can be added.
pool.close()
# Wait for tasks to exit
pool.join()
# Output results
print multi_d.items() #FAIL
print '*'*10, ' with multiprocessing and process module like on python site example', '*'*10
def test2(k, multi_dict2):
multi_dict2[k] += 1
if __name__ == '__main__':
manager = Manager()
multi_d2 = manager.dict()
for k in s:
p = Process(target=test2, args=(k, multi_d2))
p.start()
p.join()
print multi_d2 #FAIL
The first result works(because its not using multiprocessing
), but I'm having problems getting it to work with multiprocessing
. I'm not sure how to solve it but I think there might be due to it not being synced(and joining the results later) or maybe because within multiprocessing
I cannot figure how to set defaultdict(int)
to the dictionary.
Any help or suggestions on how to get this to work would be great!
You can subclass BaseManager
and register additional types for sharing. You need to provide a suitable proxy type in cases where the default AutoProxy
-generated type does not work. For defaultdict
, if you only need to access the attributes that are already present in dict
, you can use DictProxy
.
from multiprocessing import Pool
from multiprocessing.managers import BaseManager, DictProxy
from collections import defaultdict
class MyManager(BaseManager):
pass
MyManager.register('defaultdict', defaultdict, DictProxy)
def test(k, multi_dict):
multi_dict[k] += 1
if __name__ == '__main__':
pool = Pool(processes=4)
mgr = MyManager()
mgr.start()
multi_d = mgr.defaultdict(int)
for k in 'mississippi':
pool.apply_async(test, (k, multi_d))
pool.close()
pool.join()
print multi_d.items()
这篇关于将defaultdict与多处理一起使用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!