python - 如何通过pool.imap()函数传递数据以外的参数以在python中进行多处理? [英] how to pass parameters other than data through pool.imap() function for multiprocessing in python?

查看:182
本文介绍了python - 如何通过pool.imap()函数传递数据以外的参数以在python中进行多处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究高光谱图像.为了减少图像中的噪声,我使用 pywt 包使用小波变换.当我正常执行此操作(串行处理)时,它运行顺利.但是当我尝试使用多核对图像进行小波变换来实现并行处理时,我必须传递某些参数,例如

I am working on hyperspectral images. To reduce the noise from the image I am using wavelet transformation using pywt package. When I am doing this normally(serial processing) it's working smoothly. But when I am trying to implement parallel processing using multiple cores for wavelet transformation on the image, then I have to pass certain parameters like

  1. 小波族
  2. 阈值
  3. 阈值技术(硬/软)

但是我无法使用池对象传递这些参数,当我使用 pool.imap() 时,我只能将数据作为参数传递.但是当我使用 pool.apply_async() 时,它花费了更多的时间,而且输出的顺序也不一样.我在这里添加代码以供参考:

But I am not able to pass these parameters using the pool object, I can pass only data as an argument when I am using the pool.imap(). But when I am using the pool.apply_async() it's taking much more time and also the order of the output is not the same. Here I am adding the code for reference:

import matplotlib.pyplot as plt
import numpy as np
import multiprocessing as mp
import os
import time
from math import log10, sqrt
import pywt
import tifffile

def spec_trans(d,wav_fam,threshold_val,thresh_type):
  
  data=np.array(d,dtype=np.float64)
  data_dec=decomposition(data,wav_fam)
  data_t=thresholding(data_dec,threshold_val,thresh_type)
  data_rec=reconstruction(data_t,wav_fam)
  
  return data_rec

if __name__ == '__main__':
    
    #input
    X=tifffile.imread('data/Classification/university.tif')
    #take paramaters
    threshold_val=float(input("Enter the value for image thresholding: "))
    print("The available wavelet functions:",pywt.wavelist())
    wav_fam=input("Choose a wavelet function for transformation: ")
    threshold_type=['hard','soft']
    print("The available wavelet functions:",threshold_type)
    thresh_type=input("Choose a type for threshholding technique: ")

    start=time.time()
    p = mp.Pool(4)
    jobs=[]
    for dataBand in xmp:
      jobs.append(p.apply_async(spec_trans,args=(dataBand,wav_fam,threshold_val,thresh_type)))
    transformedX=[]
    for jobBit in jobs:
      transformedX.append(jobBit.get())
    end=time.time()
    p.close()


此外,当我使用软"技术进行阈值设置时,我面临以下错误:

Also when I am using the 'soft' technique for thresholding I am facing the following error:

C:\Users\Sawon\anaconda3\lib\site-packages\pywt\_thresholding.py:25: RuntimeWarning: invalid value encountered in multiply
  thresholded = data * thresholded

串行执行和并行执行的结果或多或少是一样的.但在这里我得到的结果略有不同.任何修改代码的建议都会有所帮助谢谢

The results of serial execution and parallel execution would be more or less the same. But here I am getting slightly different results. Any suggestion to modify the code will be helpful Thank

推荐答案

假设 wav_famthreshold_valthresh_type 不因调用而变化调用,首先将这些参数安排为工作函数 spec_transfirst 参数:

Assuming wav_fam, threshold_val and thresh_type do not vary from call to call, first arrange for these arguments to be the first arguments to worker function spec_trans:

def spec_trans(wav_fam, threshold_val, thresh_type, d):

现在我没有看到您在池创建块中定义了 xmp,但大概这是一个可迭代的.您需要将这段代码修改如下:

Now I don't see where in your pool-creation block you have defined xmp, but presumably this is an iterable. You need to modify this code as follows:

from functools import partial

def compute_chunksize(pool_size, iterable_size):
    chunksize, remainder = divmod(iterable_size, 4 * pool_size)
    if remainder:
        chunksize += 1
    return chunksize

if __name__ == '__main__':

    X=tifffile.imread('data/Classification/university.tif')
    #take paramaters
    threshold_val=float(input("Enter the value for image thresholding: "))
    print("The available wavelet functions:",pywt.wavelist())
    wav_fam=input("Choose a wavelet function for transformation: ")
    threshold_type=['hard','soft']
    print("The available wavelet functions:",threshold_type)
    thresh_type=input("Choose a type for threshholding technique: ")

    start=time.time()
    p = mp.Pool(4)
    # first 3 arguments to spec_trans will be wav_fam, threshold_val and thresh_type 
    worker = partial(spec_trans, wav_fam, threshold_val, thresh_type)
    suitable_chunksize = compute_chunksize(4, len(xmp))
    transformedX = list(p.imap(worker, xmp, chunksize=suitable_chunksize))
    end=time.time()

为了获得比使用 apply_async 更高的性能,您必须使用合适的块大小";imap 的值.函数 compute_chunksize 可用于根据池的大小(4)和传递给 imap 的可迭代对象的大小计算这样的值,这将是len(xmp).如果 xmp 的大小足够小,以至于计算出的 chunksize 值为 1,我真的看不出 imap 的性能如何比 apply_async.

To obtain improved performance over using apply_async, you must use a "suitable chunksize" value with imap. Function compute_chunksize can be used for computing such a value based on the size of your pool, which is 4, and the size of the iterable being passed to imap, which would be len(xmp). If the size of xmp is small enough such that the chunksize value computed is 1, I don't really see how imap would be significantly more performant over apply_async.

当然,你也可以直接使用:

Of course, you might as well just use:

    transformedX = p.map(worker, xmp)

然后让池计算自己合适的块大小.imapmap 有优势,当 iterable 非常大并且还不是一个列表时.对于 map 计算合适的块大小,它首先必须将迭代转换为列表以获取其长度,这可能是内存效率低下的.但是,如果您知道可迭代对象的长度(或近似长度),那么通过使用 imap,您可以显式设置块大小,而无需将可迭代对象转换为列表.imap_unordered 相对于 map 的另一个优点是,您可以在单个任务可用时处理它们的结果,而使用 map 您只能获得结果当所有提交的任务完成时.

And let the pool compute its own suitable chunksize. imap has an advantage over map when the iterable is very large and not already a list. For map to compute a suitable chunksize it would first have to convert the iterable to a list just to get its length and this could be memory inefficient. But if you know the length (or approximate length) of the iterable, then by using imap you can explicitly set a chunksize without having to convert the iterable to a list. The other advantage of imap_unordered over map is that you can process the results for the individual tasks as they become available whereas with map you only get results when all the submitted tasks are complete.

更新

如果你想捕获提交给你的工作函数的单个任务可能抛出的异常,那么坚持使用imap,并使用下面的代码迭代imap返回的结果代码>:

If you want to catch possible exceptions thrown by individual tasks submitted to your worker function, then stick with using imap, and use the following code to iterate the results returned by imap:

    #transformedX = list(p.imap(worker, xmp, chunksize=suitable_chunksize))
    transformedX = []
    results = p.imap(worker, xmp, chunksize=suitable_chunksize)
    import traceback
    while True:
        try:
            result = next(results)
        except StopIteration: # no more results
            break
        except Exception as e:
            print('Exception occurred:', e)
            traceback.print_exc() # print stacktrace
        else:
            transformedX.append(result)

这篇关于python - 如何通过pool.imap()函数传递数据以外的参数以在python中进行多处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆