在python中对可迭代对象进行多处理 [英] Multiprocessing an iterable in python

查看:100
本文介绍了在python中对可迭代对象进行多处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试拆分以下代码以允许在python中进行多处理,这对我来说确实是一件令人沮丧的任务-我是多处理新手,已经阅读了文档和我能找到的尽可能多的示例,但仍然没有找到了一种可以一次在所有cpu内核上运行的解决方案.

I am trying to split the following code to allow for multiprocessing in python and it is really becoming a frustrating task for me - I am new to multiprocessing and have read the documentation and as many samples as I could find but still have not found a solution that will have it work on all cpu cores at one time.

我想将可迭代项划分为四分之一,并让它并行计算测试.

I would like to split the iterables into quarters and have it compute the test in parrallel.

我的单线程示例:

import itertools as it
import numpy as np

wmod = np.array([[0,1,2],[3,4,5],[6,7,3]])
pmod = np.array([[0,1,2],[3,4,5],[6,7,3]])

plines1 = it.product(wmod[0],wmod[1],wmod[2])
plines2 = it.product(pmod[0],pmod[1],pmod[2])

check = .915
result = []

for count, (A,B) in enumerate(zip(plines1,plines2)):
    pass

    test = (sum(B)+10)/(sum(A)+12)
    if test > check:
        result = np.append(result,[A,B])
print('results: ',result)

我意识到这只是一对3x3矩阵的很小的例子,但我想将其应用于一对较大的矩阵,并且要花大约一个小时才能计算出来.我感谢您提出的任何建议.

I realize this is a very small example of a pair of 3x3 matrices, but I would like to apply it to a pair of matrices that are larger, and take about an hour to compute. I appreciate any advice given.

推荐答案

我建议使用队列来转储您的可迭代对象.像这样的东西:

I would suggest using queues to dump your iterables. Something like that:

import multiprocessing as mp
import numpy as np
import itertools as it


def worker(in_queue, out_queue):
    check = 0.915
    for a in iter(in_queue.get, 'STOP'):
        A = a[0]
        B = a[1]
        test = (sum(B)+10)/(sum(A)+12)
        if test > check:
            out_queue.put([A,B])
        else:
            out_queue.put('')

if __name__ == "__main__":
    wmod = np.array([[0,1,2],[3,4,5],[6,7,3]])
    pmod = np.array([[0,1,2],[3,4,5],[6,7,3]])

    plines1 = it.product(wmod[0],wmod[1],wmod[2])
    plines2 = it.product(pmod[0],pmod[1],pmod[2])

    # determine length of your iterator
    counts = 26

    # setup iterator
    it = zip(plines1,plines2)

    in_queue = mp.Queue()
    out_queue = mp.Queue()

    # setup workers
    numProc = 2
    process = [mp.Process(target=worker,
                          args=(in_queue, out_queue), daemon=True) for x in range(numProc)]

    # run processes
    for p in process:
        p.start()

    results = []
    control = True

    # fill queue and get data
    # code fills the queue until a new element is available in the output
    # fill blocks if no slot is available in the in_queue
    for idx in range(counts):
        while out_queue.empty() and control:
            # fill the queue
            try:
                in_queue.put(next(it), block=True) 
            except StopIteration:
                # signals for processes stop
                for p in process:
                    print('stopping')
                    in_queue.put('STOP')
                control = False
                break
        results.append(out_queue.get(timeout=10))

    # wait for processes to finish
    for p in process:
        p.join()

    print(results)

    print('finished')

但是,您必须首先确定任务列表的长度.

However, you would have to determine first how long your task list will be.

这篇关于在python中对可迭代对象进行多处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆