加快内核估计的采样 [英] Speed up sampling of kernel estimate

查看:84
本文介绍了加快内核估计的采样的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是我正在使用的更大代码的MWE.基本上,它会在KDE上执行Monte Carlo集成(内核密度估计)位于某个阈值以下的所有值(在此问题BTW处建议使用积分方法:

Here's a MWE of a much larger code I'm using. Basically, it performs a Monte Carlo integration over a KDE (kernel density estimate) for all values located below a certain threshold (the integration method was suggested over at this question BTW: Integrate 2D kernel density estimate).

import numpy as np
from scipy import stats
import time

# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Get data.
m1, m2 = measure(20000)
# Define limits.
xmin = m1.min()
xmax = m1.max()
ymin = m2.min()
ymax = m2.max()

# Perform a kernel density estimate on the data.
x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
values = np.vstack([m1, m2])
kernel = stats.gaussian_kde(values)

# Define point below which to integrate the kernel.
x1, y1 = 0.5, 0.5

# Get kernel value for this point.
tik = time.time()
iso = kernel((x1,y1))
print 'iso: ', time.time()-tik

# Sample from KDE distribution (Monte Carlo process).
tik = time.time()
sample = kernel.resample(size=1000)
print 'resample: ', time.time()-tik

# Filter the sample leaving only values for which
# the kernel evaluates to less than what it does for
# the (x1, y1) point defined above.
tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik

# Integrate for all values below iso.
tik = time.time()
integral = insample.sum() / float(insample.shape[0])
print 'integral: ', time.time()-tik

输出看起来像这样:

iso:  0.00259208679199
resample:  0.000817060470581
filter/sample:  2.10829401016
integral:  4.2200088501e-05

显然意味着 filter/sample 调用几乎消耗了代码运行的所有时间.我必须反复运行此代码块数千次,以免浪费大量时间.

which clearly means that the filter/sample call is consuming almost all of the time the code uses to run. I have to run this block of code iteratively several thousand times so it can get pretty time consuming.

有什么方法可以加快过滤/采样过程吗?

Is there any way to speed up the filtering/sampling process?

这是我的实际代码中稍微逼真的MWE,上面写有Ophion的多线程解决方案:

Here's a slightly more realistic MWE of my actual code with Ophion's multi-threading solution written into it:

import numpy as np
from scipy import stats
from multiprocessing import Pool

def kde_integration(m_list):

    m1, m2 = [], []
    for item in m_list:
        # Color data.
        m1.append(item[0])
        # Magnitude data.
        m2.append(item[1])

    # Define limits.
    xmin, xmax = min(m1), max(m1)
    ymin, ymax = min(m2), max(m2)

    # Perform a kernel density estimate on the data:
    x, y = np.mgrid[xmin:xmax:100j, ymin:ymax:100j]
    values = np.vstack([m1, m2])
    kernel = stats.gaussian_kde(values)

    out_list = []

    for point in m_list:

        # Compute the point below which to integrate.
        iso = kernel((point[0], point[1]))

        # Sample KDE distribution
        sample = kernel.resample(size=1000)

        #Create definition.
        def calc_kernel(samp):
            return kernel(samp)

        #Choose number of cores and split input array.
        cores = 4
        torun = np.array_split(sample, cores, axis=1)

        #Calculate
        pool = Pool(processes=cores)
        results = pool.map(calc_kernel, torun)

        #Reintegrate and calculate results
        insample_mp = np.concatenate(results) < iso

        # Integrate for all values below iso.
        integral = insample_mp.sum() / float(insample_mp.shape[0])

        out_list.append(integral)

    return out_list


# Generate some random two-dimensional data:
def measure(n):
    "Measurement model, return two coupled measurements."
    m1 = np.random.normal(size=n)
    m2 = np.random.normal(scale=0.5, size=n)
    return m1+m2, m1-m2

# Create list to pass.
m_list = []
for i in range(60):
    m1, m2 = measure(5)
    m_list.append(m1.tolist())
    m_list.append(m2.tolist())

# Call KDE integration function.
print 'Integral result: ', kde_integration(m_list)

Ophion 提出的解决方案在我提供的原始代码上效果很好,但是在此版本中失败并出现以下错误:

The solution presented by Ophion works great on the original code I presented, but fails with the following error in this version:

Integral result: Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我尝试移动calc_kernel函数,因为该问题的答案之一;但我仍然无法使此代码正常工作.

I tried moving the calc_kernel function around since one of the answers in this question Multiprocessing: How to use Pool.map on a function defined in a class? states that "the function that you give to map() must be accessible through an import of your module"; but I still can't get this code to work.

任何帮助将不胜感激.

实施 Ophion的建议以删除calc_kernel函数,只需使用:

Implementing Ophion's suggestion to remove the calc_kernel function and simply using:

results = pool.map(kernel, torun)

可以摆脱PicklingError,但是现在我看到,如果我创建的初始m_list范围超过62-63个项目,则会出现此错误:

works to get rid of the PicklingError but now I see that if I create an initial m_list of anything more than around 62-63 items I get this error:

Traceback (most recent call last):
  File "~/gauss_kde_temp.py", line 67, in <module>
    print 'Integral result: ', kde_integration(m_list)
  File "~/gauss_kde_temp.py", line 38, in kde_integration
    pool = Pool(processes=cores)
  File "/usr/lib/python2.7/multiprocessing/__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 161, in __init__
    self._result_handler.start()
  File "/usr/lib/python2.7/threading.py", line 494, in start
    _start_new_thread(self.__bootstrap, ())
thread.error: can't start new thread

由于我在此代码的实际实现中的实际列表最多可以包含2000个项目,因此此问题使该代码不可用.第38行是这样的:

Since my actual list in my real implementation of this code can have up to 2000 items, this issue renders the code unusable. Line 38 is this one:

pool = Pool(processes=cores)

显然,这与我使用的内核数量有关吗?

so apparently it has something to do with the number of cores I'm using?

此问题>无法启动新的线程错误".在Python中建议使用:

threading.active_count()

在出现该错误时检查我要执行的线程数.我检查了一下,当它到达374线程时,它总是崩溃.我该如何解决这个问题?

to check the number of threads I have going when I get that error. I checked and it always crashes when it reaches 374 threads. How can I code around this issue?

这是处理最后一个问题的新问题:线程错误:无法启动新线程

Here's the new question dealing with this last issue: Thread error: can't start new thread

推荐答案

可能最快的方法是并行化kernel(sample):

Probably the easiest way to speed this up is to parallelize kernel(sample):

采用以下代码片段:

tik = time.time()
insample = kernel(sample) < iso
print 'filter/sample: ', time.time()-tik
#filter/sample:  1.94065904617

将其更改为使用multiprocessing:

from multiprocessing import Pool
tik = time.time()

#Create definition.
def calc_kernel(samp):
    return kernel(samp)

#Choose number of cores and split input array.
cores = 4
torun = np.array_split(sample, cores, axis=1)

#Calculate
pool = Pool(processes=cores)
results = pool.map(calc_kernel, torun)

#Reintegrate and calculate results
insample_mp = np.concatenate(results) < iso

print 'multiprocessing filter/sample: ', time.time()-tik
#multiprocessing filter/sample:  0.496874094009

再次检查他们是否返回相同的答案:

Double check they are returning the same answer:

print np.all(insample==insample_mp)
#True

在4核上提高了3.9倍.不知道您在运行什么,但是在大约6个处理器之后,您的输入数组大小不足以获取可观的收益.例如,使用20个处理器,其速度仅快5.8倍.

A 3.9x improvement on 4 cores. Not sure what you are running this on, but after about 6 processors your input array size is not large enough to get considerably gains. For example using 20 processors its only about 5.8x faster.

这篇关于加快内核估计的采样的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆