无法将多个参数发送到concurrrent.futures.Executor.map() [英] Unable to send multiple arguments to concurrrent.futures.Executor.map()
问题描述
我正在尝试结合这两个SO答案中提供的解决方案-将多个参数传递给parallel.futures.Executor.map?.我有一个numpy数组,将其分成多个部分,我希望将每个块发送到一个单独的线程,并将一个附加参数与原始数组的块一起发送.此附加参数是一个常量,不会更改. performCalc是一个带有两个参数的函数-一个是原始numpy数组的一部分,另一个是常量.
I am trying to combine the solutions provided in both of these SO answers - Using threading to slice an array into chunks and perform calculation on each chunk and reassemble the returned arrays into one array and Pass multiple parameters to concurrent.futures.Executor.map?. I have a numpy array that I chunk into segments and I want each chunk to be sent to a separate thread and an additional argument to be sent along with the chunk of the original array. This additional argument is a constant and will not change. The performCalc is a function that will take two arguments -one the chunk of the original numpy array and a constant.
我尝试过的第一个解决方案
First solution I tried
import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def main():
testThread()
def testThread():
minLat = -65.76892
maxLat = 66.23587
minLon = -178.81404
maxLon = 176.2949
latGrid = np.arange(minLat,maxLat,0.05)
lonGrid = np.arange(minLon,maxLon,0.05)
gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
n_jobs = psutil.cpu_count(logical=False)
chunk = np.array_split(grid_points,n_jobs,axis=0)
x = ThreadPoolExecutor(max_workers=n_jobs)
maxDistance = 4.3
func = partial(performCalc,chunk)
args = [chunk,maxDistance]
# This prints 4.3 twice although there are four cores in the system
results = x.map(func,args)
# This prints 4.3 four times correctly
results1 = x.map(performTest,chunk)
def performCalc(chunk,maxDistance):
print(maxDistance)
return chunk
def performTest(chunk):
print("test")
main()
因此,即使系统中的内核数为4,performCalc()也会打印两次4.3.而performTest()则可以正确打印四次测试.我无法找出发生此错误的原因.
So performCalc() prints 4.3 twice even though the number of cores in the system is 4. While performTest() prints test four times correctly. I am not able to figure out the reason for this error.
我也确定为itertools.partial调用设置的方法不正确.
Also I am sure the way I set up the for itertools.partial call is incorrect.
1)原始numpy数组有四个块.
1) There are four chunks of the original numpy array.
2)每个块都应与maxDistance配对并发送到performCalc()
2) Each chunk is to be paired with maxDistance and sent to performCalc()
3)将有四个线程将打印maxDistance并将返回总结果的一部分,这些结果将在一个数组中返回
3) There will be four threads that will print maxDistance and will return parts of the total result which will be returned in one array
我要去哪里错了?
更新
我也尝试使用lambda方法
I tried using the lambda approach as well
results = x.map(lambda p:performCalc(*p),args)
但这不会打印任何内容.
but this prints nothing.
推荐答案
Using the solution provided by user mkorvas as shown here - How to pass a function with more than one argument to python concurrent.futures.ProcessPoolExecutor.map()? I was able to solve my problem as shown in the solution here -
import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial
def main():
testThread()
def testThread():
minLat = -65.76892
maxLat = 66.23587
minLon = -178.81404
maxLon = 176.2949
latGrid = np.arange(minLat,maxLat,0.05)
lonGrid = np.arange(minLon,maxLon,0.05)
print(latGrid.shape,lonGrid.shape)
gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
print(grid_points.shape)
n_jobs = psutil.cpu_count(logical=False)
chunk = np.array_split(grid_points,n_jobs,axis=0)
x = ThreadPoolExecutor(max_workers=n_jobs)
maxDistance = 4.3
func = partial(performCalc,maxDistance)
results = x.map(func,chunk)
def performCalc(maxDistance,chunk):
print(maxDistance)
return chunk
main()
一个人显然需要做的(我不知道为什么,也许有人可以在另一个答案中阐明)是您需要将输入顺序切换到函数performCalc()
What apparently one needs to do(and I do not know why and maybe somebody can clarify in another answer) is you need to switch the order of input to the function performCalc()
如此处所示-
def performCalc(maxDistance,chunk):
print(maxDistance)
return chunk
这篇关于无法将多个参数发送到concurrrent.futures.Executor.map()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!