使用Manager,Pool和共享列表的Python多处理并发不起作用 [英] Python Multiprocessing concurrency using Manager, Pool and a shared list not working

查看:106
本文介绍了使用Manager,Pool和共享列表的Python多处理并发不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习python多处理,并且尝试使用此功能填充OS中存在的所有文件的列表.但是,我编写的代码仅按顺序执行.

I am learning python multiprocessing, and I am trying to use this feature to populate a list with all the files present in an os. However, the code that I wrote is executing sequentially only.

#!/usr/bin/python
import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]] #Gets a top level directory names inside "/"
manager = multiprocessing.Manager()
files = manager.list()


def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

mp = [multiprocessing.Process(target=get_files, args=(tld[x],))
      for x in range(len(tld))]

for i in mp:
    i.start()
    i.join()
print len(files)

当我检查进程树时,我只能看到产生了一个智利进程. (man pstree说{}表示由父级产生的子进程.)

When I checked the process tree, I can see only a single chile processes spawned. (man pstree says {} denotes the child process spawned by the parent.)

---bash(10949)---python(12729)-+-python(12730)---{python}(12752)
                               `-python(12750)`

我要寻找的是为每个tld目录生成一个进程,然后填充共享列表files,根据目录数,这大约是10-15个进程.我在做什么错了?

What I was looking for was, to spawn a process for each tld directory, populate the shared list files, and that would be around 10-15 processes depending on the number of directories. What am I doing wrong?

:

我使用multiprocessing.Pool创建了工作线程,这一次 产生了多个进程,但是当我尝试使用multiprocessing.Pool.map()时却给出了错误.我指的是显示在python文档中的以下代码

I used multiprocessing.Pool to create worker threads, and this time the processes are spawned, but is giving errors when I try to usemultiprocessing.Pool.map(). I was referring to the following code in python docs that shows

from multiprocessing import Pool
def f(x):
return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3])) 

在该示例之后,我将代码重写为

Following that example, I rewrote the code as

import os
import multiprocessing
tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()
pool = multiprocessing.Pool(processes=len(tld))
print pool
files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))
pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)

它正在分叉多个进程.

---bash(10949)---python(12890)-+-python(12967)
                               |-python(12968)
                               |-python(12970)
                               |-python(12971)
                               |-python(12972)
                               ---snip---

但是代码在说错了

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

Process PoolWorker-2: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get return recv() AttributeError: 'module' object has no attribute 'get_files' self._target(*self._args, **self._kwargs) self.run() task = get() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get AttributeError: 'module' object has no attribute 'get_files' self.run()

这里我在做什么错?为什么get_files()函数会出错?

What am I doing wrong here, and why do the get_files() function errors out?

推荐答案

这仅仅是因为您在定义函数get_files之前实例化了池:

It's simply because you instantiate your pool before defining the function get_files :

import os
import multiprocessing

tld = [os.path.join("/", f) for f in os.walk("/").next()[1]]
manager = multiprocessing.Manager()

files = manager.list()
def get_files(x):
    for root, dir, file in os.walk(x):
        for name in file:
            files.append(os.path.join(root, name))

pool = multiprocessing.Pool(processes=len(tld)) # Instantiate the pool here

pool.map(get_files, [x for x in tld])
pool.close()
pool.join()
print len(files)

一个进程的总体思想是,在启动它的那一刻,您便分叉了主进程的内存.因此,在主流程 之后完成的任何定义都不会在子流程中.

The overall idea of a process is that at the instant you start it, you fork the memory of the main process. So any definition done in the main process after the fork will not be in the subprocess.

如果要共享内存,可以使用threading库,但是会遇到一些问题(参见:全局解释器锁定)

If you want a shared memory, you can use the threading library, but you will have some issues with it (cf: The global interpreter lock)

这篇关于使用Manager,Pool和共享列表的Python多处理并发不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆