如何在Python多处理中利用所有内核 [英] How to utilize all cores with python multiprocessing

查看:84
本文介绍了如何在Python多处理中利用所有内核的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

已经摆弄Python的multicore函数一个多小时了,尝试使用ProcessManager并行化一个相当复杂的图形遍历函数:

have been fiddling with Python's multicore function for upwards of an hour now, trying to parallelize a rather complex graph traversal function using Process and Manager:

import networkx as nx
import csv
import time 
from operator import itemgetter
import os
import multiprocessing as mp

cutoff = 1

exclusionlist = ["cpd:C00024"]

DG = nx.read_gml("KeggComplete.gml", relabel = True)

for exclusion in exclusionlist:
    DG.remove_node(exclusion)

#checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__), 'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
    os.makedirs(fn)

manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),key=itemgetter(1),reverse=True)

def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
    source = item[0]
    uniqueTreePaths = []
    if cutoff < 1:
        return
    visited = [source]
    stack = [iter(DG[source])]
    while stack:
        children = stack[-1]
        child = next(children, None)
        if child is None:
            stack.pop()
            visited.pop()
        elif child in memorizedPaths:
            for path in memorizedPaths[child]:
                newPath = (tuple(visited) + tuple(path))
                if (len(newPath) <= cutoff) and (len(set(visited) & set(path)) == 0):
                    uniqueTreePaths.append(newPath)
            continue
        elif len(visited) < cutoff:
            if child not in visited:
                visited.append(child)
                stack.append(iter(DG[child]))
                if visited not in uniqueTreePaths:
                    uniqueTreePaths.append(tuple(visited))
        else: #len(visited) == cutoff:
            if (visited not in uniqueTreePaths) and (child not in visited):
                uniqueTreePaths.append(tuple(visited + [child]))
            stack.pop()
            visited.pop()
    #writes the absolute path of the node path file into the hash table
    filepaths[source] = str(fn) + "/" + str(source) +"path.txt"
    with open (filepaths[source], "wb") as csvfile2:
        writer = csv.writer(csvfile2, delimiter=' ', quotechar='|')
        for path in uniqueTreePaths:
            writer.writerow(path)
    memorizedPaths[source] = uniqueTreePaths

############################################################################

start = time.clock()
if __name__ == '__main__':
    for item in degreelist:
        test = mp.Process(target=_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
        test.start()
        test.join()
end = time.clock()
print (end-start)

当前-尽管运气和魔力-它有效(有点).我的问题是我只使用了24个内核中的12个.

Currently - though luck and magic - it works (sort of). My problem is I'm only using 12 of my 24 cores.

有人可以解释为什么会这样吗?也许我的代码不是最好的多处理解决方案,或者它不是我的体系结构的功能[在Ubuntu 13.04 x64上运行的Intel Xeon CPU E5-2640 @ 2.50GHz x18]?

Can someone explain why this might be the case? Perhaps my code isn't the best multiprocessing solution, or is it a feature of my architecture [Intel Xeon CPU E5-2640 @ 2.50GHz x18 running on Ubuntu 13.04 x64]?

我设法得到:

p = mp.Pool()
for item in degreelist:
    p.apply_async(_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()

正在工作,但是非常慢!所以我认为我使用的功能不正确.希望它有助于明确我要完成的工作!

Working, however, it's VERY SLOW! So I assume I'm using the wrong function for the job. hopefully it helps clarify exactly what I'm trying to accomplish!

.map尝试:

partialfunc = partial(_all_simple_paths_graph, DG=DG, cutoff=cutoff, memorizedPaths=memorizedPaths, filepaths=filepaths)
p = mp.Pool()
for item in processList:
    processVar = p.map(partialfunc, xrange(len(processList)))   
p.close()
p.join()

工作,比单核慢.是时候进行优化了!

Works, is slower than singlecore. Time to optimize!

推荐答案

此处堆积太多,无法在注释中进行处理,因此,mpmultiprocessing:

Too much piling up here to address in comments, so, where mp is multiprocessing:

mp.cpu_count()应该返回处理器数量.但是测试一下.有些平台很时髦,而且此信息并非总是很容易获得. Python会尽力而为.

mp.cpu_count() should return the number of processors. But test it. Some platforms are funky, and this info isn't always easy to get. Python does the best it can.

如果您启动24个流程,它们将完全按照您告诉他们的方式进行;-)看来mp.Pool()对您而言将是最方便的.您将要创建的进程数传递给它的构造函数. mp.Pool(processes=None)将使用mp.cpu_count()作为处理器数量.

If you start 24 processes, they'll do exactly what you tell them to do ;-) Looks like mp.Pool() would be most convenient for you. You pass the number of processes you want to create to its constructor. mp.Pool(processes=None) will use mp.cpu_count() for the number of processors.

然后,您可以在Pool实例上使用.imap_unordered(...)来将degreelist分布在各个进程中.也许其他一些Pool方法对您更有效-实验.

Then you can use, for example, .imap_unordered(...) on your Pool instance to spread your degreelist across processes. Or maybe some other Pool method would work better for you - experiment.

如果您无法将问题扑向Pool的世界观,则可以创建mp.Queue来创建工作队列,将.put()个节点(或节点切片)减少开销)在主程序中进行处理,并将工作线程写入该队列之外的.get()工作项.询问您是否需要示例.请注意,您需要将哨兵值(每个进程一个)放在所有真实"工作项之后,以便工作进程可以测试哨兵以了解何时完成.

If you can't bash the problem into Pool's view of the world, you could instead create an mp.Queue to create a work queue, .put()'ing nodes (or slices of nodes, to reduce overhead) to work on in the main program, and write the workers to .get() work items off that queue. Ask if you need examples. Note that you need to put sentinel values (one per process) on the queue, after all the "real" work items, so that worker processes can test for the sentinel to know when they're done.

仅供参考,我喜欢队列,因为它们更明确.许多其他人喜欢Pool更好,因为它们更神奇;-)

FYI, I like queues because they're more explicit. Many others like Pools better because they're more magical ;-)

这是您的可执行原型.这显示了将imap_unorderedPoolchunksize结合使用的一种方法,该方法不需要更改任何函数签名.当然,您必须插入真实的代码;-)请注意,init_worker方法仅允许每个处理器大多数"参数传递一次,而不能为degreeslist中的每个项目传递一次.减少进程间的通信量对于提高速度至关重要.

Here's an executable prototype for you. This shows one way to use imap_unordered with Pool and chunksize that doesn't require changing any function signatures. Of course you'll have to plug in your real code ;-) Note that the init_worker approach allows passing "most of" the arguments only once per processor, not once for every item in your degreeslist. Cutting the amount of inter-process communication can be crucial for speed.

import multiprocessing as mp

def init_worker(mps, fps, cut):
    global memorizedPaths, filepaths, cutoff
    global DG

    print "process initializing", mp.current_process()
    memorizedPaths, filepaths, cutoff = mps, fps, cut
    DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)

def work(item):
    _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)

def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
    pass # print "doing " + str(item)

if __name__ == "__main__":
    m = mp.Manager()
    memorizedPaths = m.dict()
    filepaths = m.dict()
    cutoff = 1 ##
    # use all available CPUs
    p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
                                                   filepaths,
                                                   cutoff))
    degreelist = range(100000) ##
    for _ in p.imap_unordered(work, degreelist, chunksize=500):
        pass
    p.close()
    p.join()

我强烈建议完全按原样运行此程序,因此您可以看到它运行得很快.然后将其添加一些时间,以了解它如何影响时间.例如,只需添加

I strongly advise running this exactly as-is, so you can see that it's blazing fast. Then add things to it a bit a time, to see how that affects the time. For example, just adding

   memorizedPaths[item] = item

_all_simple_paths_graph()极大地降低了它的速度.为什么?因为dict随着每次添加而变得越来越大,所以该过程安全dict必须在所有进程之间(在幕后)同步.同步单位是整个字典"-mp机制没有内部结构可用来对共享字典进行增量更新.

to _all_simple_paths_graph() slows it down enormously. Why? Because the dict gets bigger and bigger with each addition, and this process-safe dict has to be synchronized (under the covers) among all the processes. The unit of synchronization is "the entire dict" - there's no internal structure the mp machinery can exploit to do incremental updates to the shared dict.

如果您负担不起这笔费用,则不能为此使用Manager.dict().机灵的机会比比皆是;-)

If you can't afford this expense, then you can't use a Manager.dict() for this. Opportunities for cleverness abound ;-)

这篇关于如何在Python多处理中利用所有内核的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆