使用Python和Windows中的Pathos多处理工具 [英] Working with pathos multiprocessing tool in Python and

查看:125
本文介绍了使用Python和Windows中的Pathos多处理工具的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一些代码可以使用多处理库的pathos扩展来执行某些操作.我的问题是如何使用更复杂的辅助函数-在本例中为New_PP.我应该如何格式化缓冲线以处理我的辅助函数需要的字典才能给我结果. Python默认将字典设置为全局变量,但是在辅助函数的范围内,我得到与该字典(access_dict)相关的错误,因此我该如何发送字典或确保它对我的辅助线程可用.

    Nchunks = 10
    thpool = pathos.multiprocessing.ThreadingPool()
    mppool = pathos.multiprocessing.ProcessingPool()
    Lchunk = int(len(readinfiles) / Nchunks)
    filechunks = chunks(readinfiles, 10)
    for fnames in filechunks:
            files = (open(name, 'r') for name in fnames)
            res = thpool.map(mppool.map, [New_PP]*len(fnames), files)
            print res[0][0]

和worker函数:

def New_PP(line):
    split_line = line.rstrip()
    if len(split_line) > 1:
      access_dict[4] ....

如何在access_dict处获取worker函数?

我还尝试将函数包装在一个类中,如下所示:

class MAPPP:
    def New_PP(self, line):
        self.mytype = access_dict
        return my_type

    def __init__(self, value_dict):
        self.access_dict = access_dict

和:

mapp = MAPPP(value_dict)
print mapp.value_dict
res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)

但是我遇到同样的问题.

解决方案

这里发生了一些问题:

  1. 您上面的代码有很多错误/错别字.

  2. 当您发送mapp.New_PP时,它会复制mapp.New_PP ...,因此它不会在实例之间共享access_dict,因为这些实例是在不同处理器上的不同解释器会话中创建和销毁的.

以下内容可能会更清楚地说明...

>>> class MAPPP(object):
...   access_dict = {}
...   def __init__(self, value_dict):
...     MAPPP.access_dict.update(value_dict)
...     return
...   def New_PP(self, line):
...     MAPPP.access_dict[line] = len(line)
...     return len(line)
... 
>>> 
>>> mapp = MAPPP({})
>>> mapp.access_dict
{}
>>> import pathos
>>> thpool = pathos.multiprocessing.ThreadingPool()
>>> mppool = pathos.multiprocessing.ProcessingPool()
>>> fnames = ['foo.txt', 'bar.txt']
>>> files = (open(name, 'r') for name in fnames)
>>> res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
>>> res
[[21, 21, 21, 21, 21, 21, 20, 21, 19], [17, 18, 17, 17, 50, 82, 98]]
>>> mapp.access_dict
{}

那怎么了?逐行读取文件……并计算每行的长度……并返回到主过程.但是,在主进程中,行和长度的写入未添加到属于mappmapp.access_dict实例中……这是因为mapp没有被传递到其他线程和处理器...已被复制.因此,它确实起作用了……并将行添加到了该类的dict的相关副本中……但是,当进程/线程完成其工作并将行号传回map然后关闭时,它们便被垃圾回收了. /p>

pathosmultiprocessing中,目前没有超级轻松"的方法来执行此操作.但是,如果使用multiprocessingctypes,则可以这样做.

您可能要考虑使用具有共享内存和/或代理的multiprocessing:

作为pathos的作者,我计划使该功能更高级些……但是目前没有时间表.

I have some code for performing some operations using the pathos extension of the multiprocessing library. My question is how to employ a more complex worker function - in this case named New_PP. How should I format the thpool line to handle a dictionary that my worker function requires in order to give me a result. Python defaults dictionaries to global variables, but within the scope of the worker function I get an error related to this dictionary (access_dict) not being found, so how can I send in the dictionary or ensure it is available to my worker thread.

    Nchunks = 10
    thpool = pathos.multiprocessing.ThreadingPool()
    mppool = pathos.multiprocessing.ProcessingPool()
    Lchunk = int(len(readinfiles) / Nchunks)
    filechunks = chunks(readinfiles, 10)
    for fnames in filechunks:
            files = (open(name, 'r') for name in fnames)
            res = thpool.map(mppool.map, [New_PP]*len(fnames), files)
            print res[0][0]

And the worker function:

def New_PP(line):
    split_line = line.rstrip()
    if len(split_line) > 1:
      access_dict[4] ....

How can the worker function get at access_dict?

I have also tried to wrap up my function inside a class as follows:

class MAPPP:
    def New_PP(self, line):
        self.mytype = access_dict
        return my_type

    def __init__(self, value_dict):
        self.access_dict = access_dict

and:

mapp = MAPPP(value_dict)
print mapp.value_dict
res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)

However I get the same issue.

解决方案

There are a few issues going on here:

  1. your code above has a bunch of errors/typos.

  2. when you send off mapp.New_PP, it makes a copy of mapp.New_PP… so it does not share access_dict between instances because those instances are created and destroyed in a different interpreter session on a different processor.

Maybe the following will demonstrate a bit more clearly...

>>> class MAPPP(object):
...   access_dict = {}
...   def __init__(self, value_dict):
...     MAPPP.access_dict.update(value_dict)
...     return
...   def New_PP(self, line):
...     MAPPP.access_dict[line] = len(line)
...     return len(line)
... 
>>> 
>>> mapp = MAPPP({})
>>> mapp.access_dict
{}
>>> import pathos
>>> thpool = pathos.multiprocessing.ThreadingPool()
>>> mppool = pathos.multiprocessing.ProcessingPool()
>>> fnames = ['foo.txt', 'bar.txt']
>>> files = (open(name, 'r') for name in fnames)
>>> res = thpool.map(mppool.map, [mapp.New_PP]*len(fnames), files)
>>> res
[[21, 21, 21, 21, 21, 21, 20, 21, 19], [17, 18, 17, 17, 50, 82, 98]]
>>> mapp.access_dict
{}

So what happened? The files were read, line by line… and the lengths of each line were computed… and returned to the main process. However, the write of the line and length were not added to the instance of mapp.access_dict that belongs to mapp in the main process… and that's because mapp is not passed to the other threads and processors… it's copied. So, it did work… and the lines were added to the relevant copies of the class's dict… but then they were garbage collected when the process/thread did its job and passed the line numbers back through the map then shut down.

There is no "super-easy" way to do this in pathos or multiprocessing right now. However, you can do it if you use multiprocessing and ctypes.

You might want to look at working with multiprocessing with shared memory and/or proxies:

As pathos author, I plan to make the functionality to do the above more high-level… but don't have a timeline for that at the moment.

这篇关于使用Python和Windows中的Pathos多处理工具的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆