如何在for循环中使用多重处理来并行化具有不同参数的对同一函数的两个调用? [英] How to use multiprocessing to parallelize two calls to the same function, with different arguments, in a for loop?

查看:108
本文介绍了如何在for循环中使用多重处理来并行化具有不同参数的对同一函数的两个调用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在for循环中,我两次调用一个函数,但具有不同的参数集(argSet1argSet2),它们在for循环的每次迭代中都会更改.我想并行化此操作,因为一组参数导致被调用函数运行更快,而另一组参数导致函数运行缓慢.请注意,我不想为此操作使用两个for循环.我还有另一个要求:这些函数中的每个函数都将执行一些并行操作,因此,由于计算资源有限,我不想让任何具有argSet1argSet2的函数运行一次以上有.确保同时运行带有两个参数集的函数将帮助我尽可能多地利用CPU内核.通常在不进行并行化的情况下是这样的:

In a for loop, I am calling a function twice but with different argument sets (argSet1, argSet2) that change on each iteration of the for loop. I want to parallelize this operation since one set of the arguments causes the called function to run faster, and the other set of arguments causes a slow run of the function. Note that I do not want to have two for loops for this operation. I also have another requirement: Each of these functions will execute some parallel operations and therefore I do not want to have any of the functions with either argSet1 or argSet2 be running more than once, because of the computational limited resources that I have. Making sure that the function with both argument sets is running will help me utilize the CPU cores as much as possible. Here's how do it normally without parallelization:

def myFunc(arg1, arg2):
    if arg1:
        print ('do something that does not take too long')
    else:
       print ('do something that takes long')

for i in range(10):
    argSet1 = arg1Storage[i]
    argSet1 = arg2Storage[i]
    myFunc(argSet1)
    myFunc(argSet2)

这绝对不会利用我拥有的计算资源.这是我尝试并行化操作的方法:

This will definitely not take the advantage of the computational resources that I have. Here's my try to parallelize the operations:

from multiprocessing import Process

def myFunc(arg1, arg2):
    if arg1:
        print ('do something that does not take too long')
    else:
       print ('do something that takes long')

for i in range(10):
    argSet1 = arg1Storage[i]
    argSet1 = arg2Storage[i]
    p1 = Process(target=myFunc, args=argSet1)
    p1.start()
    p2 = Process(target=myFunc, args=argSet2)
    p2.start()

但是,以这种方式,每个函数及其各自的参数将被调用10次,并且事情变得极其缓慢.鉴于我对多处理的知识有限,我试图通过在for循环的末尾添加p1.join()p2.join()来进一步改善性能,但这仍然会导致速度变慢,因为p1的执行速度要快得多,并且事情要等到p2完成.我还考虑过使用multiprocessing.Value与功能进行一些通信,但随后我必须在每个功能调用的功能内部添加一个while循环,这又会减慢所有操作的速度.我想知道是否有人可以提供实用的解决方案?

However, this way each function with its respective arguments will be called 10 times and things become extremely slow. Given my limited knowledge of multiprocessing, I tried to improve things a bit more by adding p1.join() and p2.join() to the end of the for loop but this still causes slow down as p1 is done much faster and things wait until p2 is done. I also thought about using multiprocessing.Value to do some communication with the functions but then I have to add a while loop inside the function for each of the function calls which slows down everything again. I wonder if someone can offer a practical solution?

推荐答案

由于我是在补丁程序中构建此答案的,因此请向下滚动以找到针对此问题的最佳解决方案

您需要准确指定运行方式.据我所知,您希望最多同时运行两个进程.此外,您也不希望繁重的通话阻止快速通话.一种简单的非最佳运行方式是:

You need specify to exactly how you want things to run. As far as I can tell, you want two processes to run at most, but also at least. Also, you do not want the heavy call to hold up the fast ones. One simple non-optimal way to run is:

from multiprocessing import Process

def func(counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print(somearg)

def loop(counter,arglist):
    for i in range(10):
        func(counter,arglist[i])

heavy = Process(target=loop,args=[1000000,['heavy'+str(i) for i in range(10)]])
light = Process(target=loop,args=[500000,['light'+str(i) for i in range(10)]])
heavy.start()
light.start()
heavy.join()
light.join()

这里的输出是(例如运行):

The output here is (for one example run):

light0
heavy0
light1
light2
heavy1
light3
light4
heavy2
light5
light6
heavy3
light7
light8
heavy4
light9
heavy5
heavy6
heavy7
heavy8
heavy9

您会看到最后一部分不是最理想的,因为您要进行一系列繁重的操作-这意味着有一个过程而不是两个.

You can see the last part is sub-optimal, since you have a sequence of heavy runs - which means there is one process instead of two.

一种简单的优化方法,如果您可以估计繁重的流程要运行多长时间.如果它的速度是这里的两倍,则只需先执行7次重载迭代,然后加入轻量进程,然后再运行3次.

An easy way to optimize this, if you can estimate how much longer is the heavy process running. If it's twice as slow, as here, just run 7 iterations of heavy first, join the light process, and have it run the additional 3.

另一种方法是成对运行繁重的进程,因此首先您有3个进程,直到快速进程结束,然后再继续2.

Another way is to run the heavy process in pairs, so at first you have 3 processes until the fast process ends, and then continues with 2.

要点是将重调用和轻调用完全隔离到另一个进程中-因此,尽管快速调用一个接一个地完成,但您可以处理慢速的工作.快速结束后,由您自己决定如何继续进行详细设计,但我认为目前估计如何分解繁重的通话已经足够了.这就是我的示例:

The main point is separating the heavy and light calls to another process entirely - so while the fast calls complete one after the other quickly you can work your slow stuff. Once th fast ends, it's up to you how elaborate do you want to continue, but I think for now estimating how to break up the heavy calls is good enough. This is it for my example:

from multiprocessing import Process

def func(counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print(somearg)

def loop(counter,amount,arglist):
    for i in range(amount):
        func(counter,arglist[i])

heavy1 = Process(target=loop,args=[1000000,7,['heavy1'+str(i) for i in range(7)]])
light = Process(target=loop,args=[500000,10,['light'+str(i) for i in range(10)]])
heavy2 = Process(target=loop,args=[1000000,3,['heavy2'+str(i) for i in range(7,10)]])
heavy1.start()
light.start()
light.join()
heavy2.start()
heavy1.join()
heavy2.join()

输出:

light0
heavy10
light1
light2
heavy11
light3
light4
heavy12
light5
light6
heavy13
light7
light8
heavy14
light9
heavy15
heavy27
heavy16
heavy28
heavy29

利用率更高.您当然可以通过共享一个慢进程运行队列来使此方法更高级,因此,完成快速进程后,他们可以作为慢队列中的工作人员加入,但是对于只有两个不同的调用,这可能会过大(尽管使用起来并不难) 队列). 最佳解决方案:

Much better utilization. You can of course make this more advanced by sharing a queue for the slow process runs, so when the fast are done they can join as workers on the slow queue, but for only two different calls this may be overkill (though not much harder using the queue). The best solution:

from multiprocessing import Queue,Process
import queue

def func(index,counter,somearg):
    j = 0
    for i in range(counter): j+=i
    print("Worker",index,':',somearg)

def worker(index):
    try:
        while True:
            func,args = q.get(block=False)
            func(index,*args)
    except queue.Empty: pass

q = Queue()
for i in range(10):
    q.put((func,(500000,'light'+str(i))))
    q.put((func,(1000000,'heavy'+str(i))))

nworkers = 2
workers = []
for i in range(nworkers):
    workers.append(Process(target=worker,args=(i,)))
    workers[-1].start()
q.close()
for worker in workers:
    worker.join()

这是您想要的最佳,最具扩展性的解决方案.输出:

This is the best and most scalable solution for what you want. Output:

Worker 0 : light0
Worker 0 : light1
Worker 1 : heavy0
Worker 1 : light2
Worker 0 : heavy1
Worker 0 : light3
Worker 1 : heavy2
Worker 1 : light4
Worker 0 : heavy3
Worker 0 : light5
Worker 1 : heavy4
Worker 1 : light6
Worker 0 : heavy5
Worker 0 : light7
Worker 1 : heavy6
Worker 1 : light8
Worker 0 : heavy7
Worker 0 : light9
Worker 1 : heavy8
Worker 0 : heavy9

这篇关于如何在for循环中使用多重处理来并行化具有不同参数的对同一函数的两个调用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆