无法将多个参数发送到concurrrent.futures.Executor.map() [英] Unable to send multiple arguments to concurrrent.futures.Executor.map()

查看:107
本文介绍了无法将多个参数发送到concurrrent.futures.Executor.map()的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试结合这两个SO答案中提供的解决方案-将多个参数传递给parallel.futures.Executor.map?.我有一个numpy数组,将其分成多个部分,我希望将每个块发送到一个单独的线程,并将一个附加参数与原始数组的块一起发送.此附加参数是一个常量,不会更改. performCalc是一个带有两个参数的函数-一个是原始numpy数组的一部分,另一个是常量.

I am trying to combine the solutions provided in both of these SO answers - Using threading to slice an array into chunks and perform calculation on each chunk and reassemble the returned arrays into one array and Pass multiple parameters to concurrent.futures.Executor.map?. I have a numpy array that I chunk into segments and I want each chunk to be sent to a separate thread and an additional argument to be sent along with the chunk of the original array. This additional argument is a constant and will not change. The performCalc is a function that will take two arguments -one the chunk of the original numpy array and a constant.

我尝试过的第一个解决方案

First solution I tried

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
    testThread()

def testThread():

    minLat = -65.76892
    maxLat =  66.23587
    minLon =  -178.81404
    maxLon =  176.2949
    latGrid = np.arange(minLat,maxLat,0.05)
    lonGrid = np.arange(minLon,maxLon,0.05)

    gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
    grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]

    n_jobs = psutil.cpu_count(logical=False)

    chunk = np.array_split(grid_points,n_jobs,axis=0)


   x = ThreadPoolExecutor(max_workers=n_jobs) 
   maxDistance = 4.3
   func = partial(performCalc,chunk)
   args = [chunk,maxDistance]
   # This prints 4.3 twice although there are four cores in the system
   results = x.map(func,args)
   # This prints 4.3 four times correctly
   results1 = x.map(performTest,chunk)

  def performCalc(chunk,maxDistance):
      print(maxDistance)
      return chunk

 def performTest(chunk):
     print("test")

 main()

因此,即使系统中的内核数为4,performCalc()也会打印两次4.3.而performTest()则可以正确打印四次测试.我无法找出发生此错误的原因.

So performCalc() prints 4.3 twice even though the number of cores in the system is 4. While performTest() prints test four times correctly. I am not able to figure out the reason for this error.

我也确定为itertools.partial调用设置的方法不正确.

Also I am sure the way I set up the for itertools.partial call is incorrect.

1)原始numpy数组有四个块.

1) There are four chunks of the original numpy array.

2)每个块都应与maxDistance配对并发送到performCalc()

2) Each chunk is to be paired with maxDistance and sent to performCalc()

3)将有四个线程将打印maxDistance并将返回总结果的一部分,这些结果将在一个数组中返回

3) There will be four threads that will print maxDistance and will return parts of the total result which will be returned in one array

我要去哪里错了?

更新

我也尝试使用lambda方法

I tried using the lambda approach as well

results = x.map(lambda p:performCalc(*p),args)

但这不会打印任何内容.

but this prints nothing.

推荐答案

使用用户mkorvas提供的解决方案,如下所示-

Using the solution provided by user mkorvas as shown here - How to pass a function with more than one argument to python concurrent.futures.ProcessPoolExecutor.map()? I was able to solve my problem as shown in the solution here -

import psutil
import numpy as np
import sys
from concurrent.futures import ThreadPoolExecutor
from functools import partial

def main():
   testThread()

def testThread():

   minLat = -65.76892
   maxLat =  66.23587
   minLon =  -178.81404
   maxLon =  176.2949
   latGrid = np.arange(minLat,maxLat,0.05)
   lonGrid = np.arange(minLon,maxLon,0.05)
   print(latGrid.shape,lonGrid.shape)
   gridLon,gridLat = np.meshgrid(latGrid,lonGrid)
   grid_points = np.c_[gridLon.ravel(),gridLat.ravel()]
   print(grid_points.shape)
   n_jobs = psutil.cpu_count(logical=False)
   chunk = np.array_split(grid_points,n_jobs,axis=0)
   x = ThreadPoolExecutor(max_workers=n_jobs) 


  maxDistance = 4.3
  func = partial(performCalc,maxDistance)

  results = x.map(func,chunk)


 def performCalc(maxDistance,chunk):

     print(maxDistance)
     return chunk

main()

一个人显然需要做的(我不知道为什么,也许有人可以在另一个答案中阐明)是您需要将输入顺序切换到函数performCalc()

What apparently one needs to do(and I do not know why and maybe somebody can clarify in another answer) is you need to switch the order of input to the function performCalc()

如此处所示-

      def performCalc(maxDistance,chunk):

          print(maxDistance)
          return chunk

这篇关于无法将多个参数发送到concurrrent.futures.Executor.map()的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆