使用多重处理查找网络路径 [英] Using multiprocessing for finding network paths
问题描述
我当前正在使用networkx函数* all_simple_paths *来查找给定的一组源节点和目标节点的网络G中的所有路径.
I am currently using the networkx function *all_simple_paths* to find all paths within a network G, for a given set of source and target nodes.
在较大/更密集的网络上,此过程非常复杂.
On larger/denser networks this process is incredibly intensive.
我想知道是否可以在此问题上使用多处理,以及是否有人对通过创建Pool等方式如何实现实现有任何想法.
I would like to know if multiprocessing could conceivably be used on this problem, and if anybody had any ideas on how that might be implemented, through creating a Pool etc.
import networkx as nx
G = nx.complete_graph(8)
sources = [1,2]
targets = [5,6,7]
for target in targets:
for source in sources:
for path in nx.all_simple_paths(G, source=source, target=target, cutoff=None):
print(path)
在此先感谢您的任何建议!
Many thanks in advance for any suggestions you may have!
推荐答案
此处是使用工作进程集合的版本.每个工作程序从队列中获取source, target
对,并在列表中收集路径.找到所有路径后,将结果放入输出Queue中,并由主进程进行整理.
Here is a version which uses a collection of worker processes. Each worker gets source, target
pairs from a Queue, and collects the paths in a list. When all the paths have been found, the results are put in an output Queue, and collated by the main process.
import networkx as nx
import multiprocessing as mp
import random
import sys
import itertools as IT
import logging
logger = mp.log_to_stderr(logging.DEBUG)
def worker(inqueue, output):
result = []
count = 0
for pair in iter(inqueue.get, sentinel):
source, target = pair
for path in nx.all_simple_paths(G, source = source, target = target,
cutoff = None):
result.append(path)
count += 1
if count % 10 == 0:
logger.info('{c}'.format(c = count))
output.put(result)
def test_workers():
result = []
inqueue = mp.Queue()
for source, target in IT.product(sources, targets):
inqueue.put((source, target))
procs = [mp.Process(target = worker, args = (inqueue, output))
for i in range(mp.cpu_count())]
for proc in procs:
proc.daemon = True
proc.start()
for proc in procs:
inqueue.put(sentinel)
for proc in procs:
result.extend(output.get())
for proc in procs:
proc.join()
return result
def test_single_worker():
result = []
count = 0
for source, target in IT.product(sources, targets):
for path in nx.all_simple_paths(G, source = source, target = target,
cutoff = None):
result.append(path)
count += 1
if count % 10 == 0:
logger.info('{c}'.format(c = count))
return result
sentinel = None
seed = 1
m = 1
N = 1340//m
G = nx.gnm_random_graph(N, int(1.7*N), seed)
random.seed(seed)
sources = [random.randrange(N) for i in range(340//m)]
targets = [random.randrange(N) for i in range(1000//m)]
output = mp.Queue()
if __name__ == '__main__':
test_workers()
# test_single_worker()
# assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker()))
test_workers
使用多进程,test_single_worker
使用单个进程.
test_workers
uses multiprocessing, test_single_worker
uses a single process.
运行test.py
不会引发AssertionError,因此看起来两个函数都返回相同的结果(至少对于我运行的有限测试而言).
Running test.py
does not raise an AssertionError, so it looks like both functions return the same result (at least for the limited tests I've run).
以下是时间结果:
% python -mtimeit -s'import test as t' 't.test_workers()'
10 loops, best of 3: 6.71 sec per loop
% python -mtimeit -s'import test as t' 't.test_single_worker()'
10 loops, best of 3: 12.2 sec per loop
因此,在这种情况下,test_workers的速度是2核系统上的test_single_worker的1.8倍.希望代码也可以很好地解决您的实际问题.我想知道结果.
So test_workers was able to achieve a 1.8x speedup over test_single_worker on a 2-core system in this case. Hopefully, the code will scale well for your real problem too. I'd be interested to know the result.
一些景点:
- 在短命函数上调用
pool.apply_async
非常慢, 因为花了太多时间将参数传递给结果 通过队列而不是使用CPU进行有用的计算. - 最好将结果收集在列表中,然后将完整结果放入
output
队列,而不是将结果放在output
一处 时间.放入队列中的每个对象都会被腌制,并且更快 腌制一个大列表而不是多个小列表. - 我认为仅通过一个过程进行打印就更安全,因此打印 语句不会互相踩踏(导致错误的输出).
- Calling
pool.apply_async
on a short-lived function is very slow, because too much time is spent passing arguments in, and results out through queues rather than using the CPUs to do useful computation. - It is better to collect results in a list and put the full result in
the
output
Queue rather than putting results inoutput
one at a time. Each object put in the Queue is pickled, and it is quicker to pickle one large list than it is many small lists. - I think it is safer to print from only one process, so the print statements do not step on each other (resulting in mangled output).
这篇关于使用多重处理查找网络路径的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!