Python中的多处理:如何通过“apply_async”实现循环作为“map_async”使用回调函数 [英] Multiprocessing in Python: how to implement a loop over "apply_async" as "map_async" using a callback function

查看:392
本文介绍了Python中的多处理:如何通过“apply_async”实现循环作为“map_async”使用回调函数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Python的多处理模块集成几个参数组合的微分方程系统。因此,系统应该被集成,并且参数组合应当被存储以及其索引和其中一个变量的最终值。



当我使用 apply_async 时,效果不错 - 这已经比在一个简单的for循环 - 我无法实现相同的事情使用 map_async ,这似乎要快于 apply_async 。回调函数从不被调用,我没有线索为什么。有人可以解释为什么会发生这种情况,以及如何使用 map_async 而不是 apply_async

这是我的代码:

 来自pylab import * 
import multiprocessing as mp
来自scipy.integrate import odeint
导入时间

#my微分方程系统
def myODE(yn,tvec,allpara):

(x,y,z)= yn

a,b = allpara ['para']

dx = -x + a * y + x * x * y
dy = b - a * y - x * x * y
dz = x * y

return(dx,dy,dz)

#返回参数组合的索引,参数和集成解决方案
#这种方式我知道哪个参数组合属于异步情况下的哪个结果
def runMyODE(yn,tvec,allpara ):
return allpara ['index'],allpara ['para'],transpose(odeint(myODE,yn,tvec,args =(allpara,)))


种子(0)

用于积分的时间设置
dt = 0.01
tmax = 50
tval = arange(0,tmax,dt)

numVar = 3#数量的变量(x,y,z)
numPar = 2#参数数量(a,b)
numComb = 5#参数组合数量

INIT = zeros((numComb,numVar))#initial条件将存储在这里
PARA = zeros((numComb,numPar))a和b的#parameter组合将存储在这里

#create一些初始条件和随机参数
用于范围(numComb)中的组合:

INIT [combi,:] = append(10 * rand ,0)#随机选择x和y的初始条件,z为0

PARA [combi,:] = 10 * rand(2)#parameter a和b随机选择

######################################## using loop over apply ######### ###########

#results将存储在此处
asyncResultsApply = []

#my回调函数
def saveResultApply(result):
#存储索引a,b和z
的最终值asyncResultsApply.append(result [0],result [1],result [2] [2, -1]))

#启动多处理部分
pool = mp.Pool(processes = 4)
用于范围(numComb)中的combi:
pool。 apply_async(runMyODE,args =(INIT [combi,:],tval,{'para':PARA [combi,:],'index':combi}),callback = saveResultApply)
pool.close b $ b pool.join()

在asyncResultsApply中的res:
print res [0],res [1],res [2] #printing索引,a,b和z

的最终值z ###################################### #using map ###########################
#唯一的区别是for循环被map_async调用替换
print \ n \\\
now using map\\\
\\\

asyncResultsMap = []

#从不调用的回调函数
def saveResultMap(result):
#存储索引,a,b和z
的最终值asyncResultsMap.append((result [0],result [1],result [2] [2,-1]))

pool = mp.Pool(processes = 4)
pool.map_async(lambda combi:runMyODE(INIT [combi ,:],tval,{'para':PARA [combi,:] ,'index':combi}),range(numComb),callback = saveResultMap)
pool.close()
pool.join()


在asyncResultsMap中的res:
print res [0],res [1],res [2] #printing索引,a,b和z
的最终值


解决方案

如果我正确理解你,它来自于常常混淆人们的东西。 apply_async 的回调在单个操作之后被调用,但是 map 的调用不会调用回调



您注意到 map 比每个元素的速度快 apply_async s。如果你想在每个结果之后发生事情,还有几种方法:



  1. 您可以使用这些元素来执行每个元素的操作, map imap (或 imap_unordered ),并在循环体内执行回调。当然,这意味着所有将在父进程中执行,但作为回调写的东西的本质意味着通常不是一个问题(它往往是廉价的函数)。 YMMV。







例如,假设您有 f cb ,并且您想要 map <$对于每个操作,使用 cb es 然后你可以做:

  def look_ma_no_cb(e):
r = f(e)
cb (r)
return r

p = multiprocessing.Pool()
p.map(look_ma_no_cb,es)


 
cb(r)


I would like to integrate a system of differential equations for several parameter combinations using Python's multiprocessing module. So, the system should get integrated and the parameter combination should be stored as well as its index and the final value of one of the variables.

While that works fine when I use apply_async - which is already faster than doing it in a simple for-loop - I fail to implement the same thing using map_async which seems to be faster than apply_async. The callback function is never called and I have no clue why. Could anyone explain why this happens and how to get the same output using map_async instead of apply_async?!

Here is my code:

from pylab import *
import multiprocessing as mp
from scipy.integrate import odeint
import time

#my system of differential equations
def myODE (yn,tvec,allpara):

    (x, y, z) = yn

    a, b = allpara['para']

    dx  = -x + a*y + x*x*y
    dy = b - a*y - x*x*y
    dz = x*y

    return (dx, dy, dz) 

#returns the index of the parameter combination, the parameters and the integrated solution
#this way I know which parameter combination belongs to which outcome in the asynch-case
def runMyODE(yn,tvec,allpara):
    return allpara['index'],allpara['para'],transpose(odeint(myODE, yn, tvec, args=(allpara,)))

#for reproducibility    
seed(0) 

#time settings for integration
dt = 0.01
tmax = 50
tval = arange(0,tmax,dt)

numVar = 3 #number of variables (x, y, z)
numPar = 2 #number of parameters (a, b)
numComb = 5 #number of parameter combinations

INIT = zeros((numComb,numVar)) #initial conditions will be stored here
PARA = zeros((numComb,numPar)) #parameter combinations for a and b will be stored here

#create some initial conditions and random parameters
for combi in range(numComb):

    INIT[combi,:] = append(10*rand(2),0) #initial conditions for x and y are randomly chosen, z is 0

    PARA[combi,:] = 10*rand(2) #parameter a and b are chosen randomly

#################################using loop over apply####################

#results will be stored in here
asyncResultsApply = []

#my callback function
def saveResultApply(result):
    # storing the index, a, b and the final value of z
    asyncResultsApply.append((result[0], result[1], result[2][2,-1]))

#start the multiprocessing part
pool = mp.Pool(processes=4)
for combi in range(numComb):
    pool.apply_async(runMyODE, args=(INIT[combi,:],tval,{'para': PARA[combi,:], 'index': combi}), callback=saveResultApply)
pool.close()
pool.join()

for res in asyncResultsApply:
    print res[0], res[1], res[2] #printing the index, a, b and the final value of z

#######################################using map#####################
#the only difference is that the for loop is replaced by a "map_async" call
print "\n\nnow using map\n\n"
asyncResultsMap = []

#my callback function which is never called
def saveResultMap(result):
    # storing the index, a, b and the final value of z
    asyncResultsMap.append((result[0], result[1], result[2][2,-1]))

pool = mp.Pool(processes=4)
pool.map_async(lambda combi: runMyODE(INIT[combi,:], tval, {'para': PARA[combi,:], 'index': combi}), range(numComb), callback=saveResultMap)
pool.close()
pool.join()

#this does not work yet
for res in asyncResultsMap:
    print res[0], res[1], res[2] #printing the index, a, b and the final value of z

解决方案

If I understood you correctly, it stems from something that confuses people quite often. apply_async's callback is called after the single op, but so does map's - it does not call the callback on each element, but rather once on the entire result.

You are correct in noting that map is faster than apply_asyncs. If you want something to happen after each result, there are a few ways to go:

  1. You can effectively add the callback to the operation you want to be performed on each element, and map using that.

  2. You could use imap (or imap_unordered) in a loop, and do the callback within the loop body. Of course, this means that all will be performed in the parent process, but the nature of stuff written as callbacks means that's usually not a problem (it tends to be cheap functions). YMMV.


For example, suppose you have the functions f and cb, and you'd like to map f on es with cb for each op. Then you could either do:

def look_ma_no_cb(e):
    r = f(e)
    cb(r)
    return r

p = multiprocessing.Pool()
p.map(look_ma_no_cb, es)

or

for r in p.imap(f, es):
    cb(r)

这篇关于Python中的多处理:如何通过“apply_async”实现循环作为“map_async”使用回调函数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆