使用多重处理查找网络路径 [英] Using multiprocessing for finding network paths

查看:72
本文介绍了使用多重处理查找网络路径的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我当前正在使用networkx函数* all_simple_paths *来查找给定的一组源节点和目标节点的网络G中的所有路径.

I am currently using the networkx function *all_simple_paths* to find all paths within a network G, for a given set of source and target nodes.

在较大/更密集的网络上,此过程非常复杂.

On larger/denser networks this process is incredibly intensive.

我想知道是否可以在此问题上使用多处理,以及是否有人对通过创建Pool等方式如何实现实现有任何想法.

I would like to know if multiprocessing could conceivably be used on this problem, and if anybody had any ideas on how that might be implemented, through creating a Pool etc.

import networkx as nx

G = nx.complete_graph(8)
sources = [1,2]
targets = [5,6,7]

for target in targets:
    for source in sources:
        for path in nx.all_simple_paths(G, source=source, target=target, cutoff=None):
            print(path)

在此先感谢您的任何建议!

Many thanks in advance for any suggestions you may have!

推荐答案

此处是使用工作进程集合的版本.每个工作程序从队列中获取source, target对,并在列表中收集路径.找到所有路径后,将结果放入输出Queue中,并由主进程进行整理.

Here is a version which uses a collection of worker processes. Each worker gets source, target pairs from a Queue, and collects the paths in a list. When all the paths have been found, the results are put in an output Queue, and collated by the main process.

import networkx as nx
import multiprocessing as mp
import random
import sys
import itertools as IT
import logging
logger = mp.log_to_stderr(logging.DEBUG)


def worker(inqueue, output):
    result = []
    count = 0
    for pair in iter(inqueue.get, sentinel):
        source, target = pair
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))
    output.put(result)

def test_workers():
    result = []
    inqueue = mp.Queue()
    for source, target in IT.product(sources, targets):
        inqueue.put((source, target))
    procs = [mp.Process(target = worker, args = (inqueue, output))
             for i in range(mp.cpu_count())]
    for proc in procs:
        proc.daemon = True
        proc.start()
    for proc in procs:    
        inqueue.put(sentinel)
    for proc in procs:
        result.extend(output.get())
    for proc in procs:
        proc.join()
    return result

def test_single_worker():
    result = []
    count = 0
    for source, target in IT.product(sources, targets):
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))

    return result

sentinel = None

seed = 1
m = 1
N = 1340//m
G = nx.gnm_random_graph(N, int(1.7*N), seed)
random.seed(seed)
sources = [random.randrange(N) for i in range(340//m)]
targets = [random.randrange(N) for i in range(1000//m)]
output = mp.Queue()

if __name__ == '__main__':
    test_workers()
    # test_single_worker()
    # assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker()))

test_workers使用多进程,test_single_worker使用单个进程.

test_workers uses multiprocessing, test_single_worker uses a single process.

运行test.py不会引发AssertionError,因此看起来两个函数都返回相同的结果(至少对于我运行的有限测试而言).

Running test.py does not raise an AssertionError, so it looks like both functions return the same result (at least for the limited tests I've run).

以下是时间结果:

% python -mtimeit -s'import test as t' 't.test_workers()'
10 loops, best of 3: 6.71 sec per loop

% python -mtimeit -s'import test as t' 't.test_single_worker()'
10 loops, best of 3: 12.2 sec per loop

因此,在这种情况下,test_workers的速度是2核系统上的test_single_worker的1.8倍.希望代码也可以很好地解决您的实际问题.我想知道结果.

So test_workers was able to achieve a 1.8x speedup over test_single_worker on a 2-core system in this case. Hopefully, the code will scale well for your real problem too. I'd be interested to know the result.

一些景点:

  • 在短命函数上调用pool.apply_async非常慢, 因为花了太多时间将参数传递给结果 通过队列而不是使用CPU进行有用的计算.
  • 最好将结果收集在列表中,然后将完整结果放入 output队列,而不是将结果放在output一处 时间.放入队列中的每个对象都会被腌制,并且更快 腌制一个大列表而不是多个小列表.
  • 我认为仅通过一个过程进行打印就更安全,因此打印 语句不会互相踩踏(导致错误的输出).
  • Calling pool.apply_async on a short-lived function is very slow, because too much time is spent passing arguments in, and results out through queues rather than using the CPUs to do useful computation.
  • It is better to collect results in a list and put the full result in the output Queue rather than putting results in output one at a time. Each object put in the Queue is pickled, and it is quicker to pickle one large list than it is many small lists.
  • I think it is safer to print from only one process, so the print statements do not step on each other (resulting in mangled output).

这篇关于使用多重处理查找网络路径的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆