使用Python和Windows中的Pathos多处理工具 [英] Working with pathos multiprocessing tool in Python and
问题描述
我有一些代码可以使用多处理库的pathos扩展来执行某些操作.我的问题是如何使用更复杂的辅助函数-在本例中为New_PP
.我应该如何格式化缓冲线以处理我的辅助函数需要的字典才能给我结果. Python默认将字典设置为全局变量,但是在辅助函数的范围内,我得到与该字典(access_dict
)相关的错误,因此我该如何发送字典或确保它对我的辅助线程可用.
Nchunks = 10
thpool = pathos.multiprocessing.ThreadingPool()
mppool = pathos.multiprocessing.ProcessingPool()
Lchunk = int(len(readinfiles) / Nchunks)
filechunks = chunks(readinfiles, 10)
for fnames in filechunks:
files = (open(name, 'r') for name in fnames)
res = thpool.map(mppool.map, [New_PP]*len(fnames), files)
print res[0][0]
和worker函数:
def New_PP(line):
split_line = line.rstrip()
if len(split_line) > 1:
access_dict[4] ....
如何在access_dict
处获取worker函数?
我还尝试将函数包装在一个类中,如下所示:
class MAPPP:
def New_PP(self, line):
self.mytype = access_dict
return my_type
def __init__(self, value_dict):
self.access_dict = access_dict
和:
mapp = MAPPP(value_dict)
print mapp.value_dict
res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
但是我遇到同样的问题.
这里发生了一些问题:
-
您上面的代码有很多错误/错别字.
-
当您发送
mapp.New_PP
时,它会复制mapp.New_PP
...,因此它不会在实例之间共享access_dict
,因为这些实例是在不同处理器上的不同解释器会话中创建和销毁的.
以下内容可能会更清楚地说明...
>>> class MAPPP(object):
... access_dict = {}
... def __init__(self, value_dict):
... MAPPP.access_dict.update(value_dict)
... return
... def New_PP(self, line):
... MAPPP.access_dict[line] = len(line)
... return len(line)
...
>>>
>>> mapp = MAPPP({})
>>> mapp.access_dict
{}
>>> import pathos
>>> thpool = pathos.multiprocessing.ThreadingPool()
>>> mppool = pathos.multiprocessing.ProcessingPool()
>>> fnames = ['foo.txt', 'bar.txt']
>>> files = (open(name, 'r') for name in fnames)
>>> res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
>>> res
[[21, 21, 21, 21, 21, 21, 20, 21, 19], [17, 18, 17, 17, 50, 82, 98]]
>>> mapp.access_dict
{}
那怎么了?逐行读取文件……并计算每行的长度……并返回到主过程.但是,在主进程中,行和长度的写入未添加到属于mapp
的mapp.access_dict
实例中……这是因为mapp
没有被传递到其他线程和处理器...已被复制.因此,它确实起作用了……并将行添加到了该类的dict的相关副本中……但是,当进程/线程完成其工作并将行号传回map
然后关闭时,它们便被垃圾回收了. /p>
在pathos
或multiprocessing
中,目前没有超级轻松"的方法来执行此操作.但是,如果使用multiprocessing
和ctypes
,则可以这样做.
您可能要考虑使用具有共享内存和/或代理的multiprocessing
:
作为pathos
的作者,我计划使该功能更高级些……但是目前没有时间表.
I have some code for performing some operations using the pathos extension of the multiprocessing library. My question is how to employ a more complex worker function - in this case named New_PP
. How should I format the thpool line to handle a dictionary that my worker function requires in order to give me a result. Python defaults dictionaries to global variables, but within the scope of the worker function I get an error related to this dictionary (access_dict
) not being found, so how can I send in the dictionary or ensure it is available to my worker thread.
Nchunks = 10
thpool = pathos.multiprocessing.ThreadingPool()
mppool = pathos.multiprocessing.ProcessingPool()
Lchunk = int(len(readinfiles) / Nchunks)
filechunks = chunks(readinfiles, 10)
for fnames in filechunks:
files = (open(name, 'r') for name in fnames)
res = thpool.map(mppool.map, [New_PP]*len(fnames), files)
print res[0][0]
And the worker function:
def New_PP(line):
split_line = line.rstrip()
if len(split_line) > 1:
access_dict[4] ....
How can the worker function get at access_dict
?
I have also tried to wrap up my function inside a class as follows:
class MAPPP:
def New_PP(self, line):
self.mytype = access_dict
return my_type
def __init__(self, value_dict):
self.access_dict = access_dict
and:
mapp = MAPPP(value_dict)
print mapp.value_dict
res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
However I get the same issue.
There are a few issues going on here:
your code above has a bunch of errors/typos.
when you send off
mapp.New_PP
, it makes a copy ofmapp.New_PP
… so it does not shareaccess_dict
between instances because those instances are created and destroyed in a different interpreter session on a different processor.
Maybe the following will demonstrate a bit more clearly...
>>> class MAPPP(object):
... access_dict = {}
... def __init__(self, value_dict):
... MAPPP.access_dict.update(value_dict)
... return
... def New_PP(self, line):
... MAPPP.access_dict[line] = len(line)
... return len(line)
...
>>>
>>> mapp = MAPPP({})
>>> mapp.access_dict
{}
>>> import pathos
>>> thpool = pathos.multiprocessing.ThreadingPool()
>>> mppool = pathos.multiprocessing.ProcessingPool()
>>> fnames = ['foo.txt', 'bar.txt']
>>> files = (open(name, 'r') for name in fnames)
>>> res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
>>> res
[[21, 21, 21, 21, 21, 21, 20, 21, 19], [17, 18, 17, 17, 50, 82, 98]]
>>> mapp.access_dict
{}
So what happened? The files were read, line by line… and the lengths of each line were computed… and returned to the main process. However, the write of the line and length were not added to the instance of mapp.access_dict
that belongs to mapp
in the main process… and that's because mapp
is not passed to the other threads and processors… it's copied. So, it did work… and the lines were added to the relevant copies of the class's dict… but then they were garbage collected when the process/thread did its job and passed the line numbers back through the map
then shut down.
There is no "super-easy" way to do this in pathos
or multiprocessing
right now. However, you can do it if you use multiprocessing
and ctypes
.
You might want to look at working with multiprocessing
with shared memory and/or proxies:
- How to synchronize a python dict with multiprocessing
- How can I share a class between processes?
- How to combine Pool.map with Array (shared memory) in Python multiprocessing?
As pathos
author, I plan to make the functionality to do the above more high-level… but don't have a timeline for that at the moment.
这篇关于使用Python和Windows中的Pathos多处理工具的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!