如何有效地矢量化超几何 CDF 计算? [英] How to efficiently vectorise hypergeomentric CDF calculation?

查看:29
本文介绍了如何有效地矢量化超几何 CDF 计算?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一组包含 4 列和 1,000,000 行的数据框.对于每一行,我想运行一个超几何测试,将这些列的 4 个值作为输入并返回一个 p 值(使用超几何分布的累积概率密度函数).

我已经尝试了两种基于 SciPy 的实现(见下文),但它们的扩展性都很差.有没有其他方法可以以更高的效率实现我在下面所做的事情?我有一个用 R 编写的工作解决方案(在底部),但不幸的是代码必须用 Python 编写,因为它将用于从 Postgres 数据库加载数据的 Airflow 任务中,目前R 没有 Postgres 钩子.

缓慢的 SciPy 实现

这样创建的示例数据(使用 10,000 行而不是完整的 52 * 1,000,000 行):

将 numpy 导入为 np将熊猫导入为 pd从 scipy.stats 导入 hypergeom从 timeit 导入 default_timer 作为计时器n_rows = 10000n_total = 1000最大好 = 400最大样本 = 200s = 100df = pd.DataFrame({'ngood': np.random.hypergeometric(ngood=max_good, nbad=n_total - max_good,nsample=s,大小=n_rows),'nsamp': np.random.hypergeometric(ngood=max_sample, nbad=n_total - max_sample,nsample=s,大小=n_rows)})df = df.assign(kgood=np.array([np.random.hypergeometric(ngood=ngood,nbad=n_total - ngood,nsample=nsamp,大小=1)对于 ngood,nsamp在 zip(df.ngood, df.nsamp)]))

基于理解的缓慢实现:

start = timer()资源 = [hypergeom.cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)对于 ngood_found、ngood、nsamp在 zip(df.kgood, df.ngood, df.nsamp)]结束 = 计时器()打印(分辨率[0:10])打印(经过时间:%fs"%(结束 - 开始))

[0.44247900002512713, 0.71587318053768023, 0.97215178135616498]经过时间:2.405838s

基于 numpy 向量化的缓慢实现:

vectorized_test = np.vectorize(hypergeom.cdf, otypes=[np.float], exclude='M')开始 = 计时器()res = vectorized_test(k=df.kgood.values, M=n_total,n=df.ngood.values, N=df.nsamp.values)结束 = 计时器()打印(分辨率[0:10])打印(经过时间:%fs"%(结束 - 开始))

[ 0.442479 0.71587318 0.97215178]已用时间:2.518952s

快速 R 实现

这说明上面的计算可以在几毫秒内完成.诀窍是 phyper 在 C 级别上进行矢量化,与本质上是 python 循环 AFAIK 的 numpy 矢量化相反.

库(微基准)n_rows <- 10000n_total <- 1000max_good <- 400max_sample <- 200s <- 100df <- 数据.frame(ngood = rhyper(nn=n_rows, m=max_good, n=n_total - max_good, k=s),nsamp = rhyper(nn=n_rows, m=max_sample, n=n_total - max_sample, k=s))df$kgood <-rhyper(nn=n_rows, m=df$ngood, n=n_total - df$ngood, k=df$nsamp)微基准(res <- phyper(q = df$k, m = df$ngood, n = n_total - df$ngood, k=df$nsamp))

单位:毫秒表达式phyper(q = df$k, m = df$ngood, n = n_total - df$ngood, k = df$nsamp)min lq 平均中位数 uq max neval2.984852 3.00838 3.350509 3.134745 3.439138 5.462694 100

解决方案

hypergeom.cdf的结果缓存为:

from functools import lru_cache#@lru_cache(maxsize = 16*1024)#def fn(k, n, N):# return hypergeom.cdf(k = k, M=n_total, n = n, N = N)数据 = {}def fn(k, n, N):键 = (k, n, N)如果没有输入数据:val = hypergeom.cdf(k = k, M=n_total, n = n, N = N)数据[键] = val别的:val = 数据[键]返回值开始 = 计时器()资源 = [fn(ngood_found,ngood,nsamp)对于 ngood_found、ngood、nsamp在 zip(df.kgood, df.ngood, df.nsamp)]结束 = 计时器()打印(分辨率[0:10])打印(经过时间:%fs"%(结束 - 开始))

这在我的机器上给出:Elapsed time: 0.279891s (0.315840s with lru_cache)

编辑:

实际上,瓶颈似乎更在于超几何 CDF 本身的计算(而不是 for 循环的开销).为了测试这一点,我为函数 gsl_cdf_hypergeometric_P 来自 GSL 包.

%module cdf%{#include "gsl/gsl_cdf.h"%}双 gsl_cdf_hypergeometric_P(int, int, int, int);

这个文件然后被转换"成一个包:

swig -c++ -python _cdf.ig++ -fPIC -c _cdf_wrap.c -I${HOME}/venvs/p3/include/python3.5mg++ -shared _cdf_wrap.o -o _cdf.so -lgsl

然后可以直接在原始示例中使用它:

将 numpy 导入为 np将熊猫导入为 pd从 scipy.stats 导入 hypergeom从 timeit 导入 default_timer 作为计时器从 cdf 导入 gsl_cdf_hypergeometric_Pn_rows = 10000n_total = 1000最大好 = 400最大样本 = 200s = 100df = pd.DataFrame({'ngood': np.random.hypergeometric(ngood=max_good, nbad=n_total - max_good,nsample=s,大小=n_rows),'nsamp': np.random.hypergeometric(ngood=max_sample, nbad=n_total - max_sample,nsample=s,大小=n_rows)})df = df.assign(kgood=np.array([np.random.hypergeometric(ngood=ngood,nbad=n_total - ngood,nsample=nsamp,大小=1)对于 ngood,nsamp在 zip(df.ngood, df.nsamp)]))开始 = 计时器()资源 = [hypergeom.cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)对于 ngood_found、ngood、nsamp在 zip(df.kgood, df.ngood, df.nsamp)]结束 = 计时器()打印(分辨率[0:10])打印(经过时间:%fs"%(结束 - 开始))def cdf(k, M, n, N):返回 gsl_cdf_hypergeometric_P(int(k), int(n), int(M-n), int(N))开始 = 计时器()资源 = [cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)对于 ngood_found、ngood、nsamp在 zip(df.kgood, df.ngood, df.nsamp)]结束 = 计时器()打印(分辨率[0:10])打印(经过时间:%fs"%(结束 - 开始))

这产生:

<预> <代码> [0.58605423287644209,0.38055520197355552,0.70597920363472055,0.99728041338849138,0.79797439957395955,0.42245057292366844,0.58627644982763727,0.74819471224742817,0.75121042470714849,0.48561471798885397]已用时间:2.069916s[0.5860542328771666,0.38055520197384757,0.7059792036350717,0.997280413389543,0.7979743995750694,0.4224505729249291,0.5862764498272103,0.7481947122472634,0.7512104247082603,0.4856147179890127]经过时间:0.018253s

因此,即使使用普通的 for 循环,速度也非常显着.

I have a set of data frames with 4 columns and 1,000,000 rows each. For each row I'd like to run a hypergeometric test that takes the 4 values of those columns as input and return a p-value (using the cumulative probability density function of a hypergeometric distribution).

I have tried two implementations based on SciPy (below) but both scale badly. Is there any other way to achieve what I do below with better efficiency? I have a working solution written in R (at the bottom) but unfortunately the code has to be written in Python, because it is to be used in an Airflow task that loads the data from a Postgres DB and at the moment there is no Postgres hook for R.

Slow SciPy implementations

Sample data created as such (using 10,000 rows rather than the full 52 * 1,000,000 rows):

import numpy as np
import pandas as pd
from scipy.stats import hypergeom
from timeit import default_timer as timer

n_rows = 10000
n_total = 1000
max_good = 400
max_sample = 200
s = 100

df = pd.DataFrame({
  'ngood': np.random.hypergeometric(ngood=max_good, nbad=n_total - max_good,
                                    nsample=s, size=n_rows),
  'nsamp': np.random.hypergeometric(ngood=max_sample, nbad=n_total - max_sample,
                                    nsample=s, size=n_rows)
})

df = df.assign(kgood=np.array([
    np.random.hypergeometric(ngood=ngood, nbad=n_total - ngood,
                             nsample=nsamp, size=1)
    for ngood, nsamp
    in zip(df.ngood, df.nsamp)
]))

Slow implementation based on a for comprehension:

start = timer()
res = [
    hypergeom.cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
    for ngood_found, ngood, nsamp
    in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

[0.44247900002512713, 0.71587318053768023, 0.97215178135616498]
Elapsed time: 2.405838s

Slow implementation based on numpy vectorisation:

vectorized_test = np.vectorize(hypergeom.cdf, otypes=[np.float], excluded='M')
start = timer()
res = vectorized_test(k=df.kgood.values, M=n_total,
                      n=df.ngood.values, N=df.nsamp.values)
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

[ 0.442479    0.71587318  0.97215178]
Elapsed time: 2.518952s

Fast R implementation

This shows that the above calculation can be completed within milliseconds. The trick is that phyper is vectorised on C level, as opposed to the numpy vectorisation that is essentially python loop AFAIK.

library(microbenchmark)

n_rows <- 10000
n_total <- 1000
max_good <- 400
max_sample <- 200
s <- 100

df <- data.frame(
  ngood = rhyper(nn=n_rows, m=max_good, n=n_total - max_good, k=s),
  nsamp = rhyper(nn=n_rows, m=max_sample, n=n_total - max_sample, k=s)
)

df$kgood <- rhyper(nn=n_rows, m=df$ngood, n=n_total - df$ngood, k=df$nsamp)

microbenchmark(
  res <- phyper(q = df$k, m = df$ngood, n = n_total - df$ngood, k=df$nsamp)
)

Unit: milliseconds
                                                                 expr      
 phyper(q = df$k, m = df$ngood, n = n_total - df$ngood, k = df$nsamp) 

     min      lq     mean   median       uq      max neval
2.984852 3.00838 3.350509 3.134745 3.439138 5.462694   100

解决方案

small improvement could be obtained by caching the results of hypergeom.cdf as:

from functools import lru_cache

#@lru_cache(maxsize = 16*1024)
#def fn(k, n, N):
#    return hypergeom.cdf(k = k, M=n_total, n = n, N = N)

data = {}
def fn(k, n, N):
    key = (k, n, N)
    if not key in data:
        val = hypergeom.cdf(k = k, M=n_total, n = n, N = N)
        data[key] = val
    else:
        val = data[key]
    return val

start = timer()
res = [
    fn(ngood_found, ngood, nsamp)
    for ngood_found, ngood, nsamp
    in zip(df.kgood, df.ngood, df.nsamp)
]

end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

this gives on my machine: Elapsed time: 0.279891s (0.315840s with lru_cache)

EDIT:

In fact, it seems that the bottleneck is rather the calculation of the hypergeometric CDF itself (rather than the overhead of the for loop). To test this, I created a SWIG file _cdf.i for the function gsl_cdf_hypergeometric_P from the GSL package.

%module cdf
%{
#include "gsl/gsl_cdf.h"
%}
double gsl_cdf_hypergeometric_P(int, int, int, int);

This file is then "converted" into a package with:

swig -c++ -python _cdf.i
g++ -fPIC -c _cdf_wrap.c -I${HOME}/venvs/p3/include/python3.5m
g++ -shared _cdf_wrap.o -o _cdf.so -lgsl

One can then use this directly in the original example as:

import numpy as np
import pandas as pd
from scipy.stats import hypergeom
from timeit import default_timer as timer
from cdf import gsl_cdf_hypergeometric_P

n_rows = 10000
n_total = 1000
max_good = 400
max_sample = 200
s = 100

df = pd.DataFrame({
  'ngood': np.random.hypergeometric(ngood=max_good, nbad=n_total - max_good,
                                    nsample=s, size=n_rows),
  'nsamp': np.random.hypergeometric(ngood=max_sample, nbad=n_total - max_sample,
                                    nsample=s, size=n_rows)
})

df = df.assign(kgood=np.array([
    np.random.hypergeometric(ngood=ngood, nbad=n_total - ngood,
                             nsample=nsamp, size=1)
    for ngood, nsamp
    in zip(df.ngood, df.nsamp)
]))

start = timer()
res = [
    hypergeom.cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
    for ngood_found, ngood, nsamp
    in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

def cdf(k, M, n, N):
    return gsl_cdf_hypergeometric_P(int(k), int(n), int(M-n), int(N))

start = timer()
res = [
    cdf(k=ngood_found, M=n_total, n=ngood, N=nsamp)
    for ngood_found, ngood, nsamp
    in zip(df.kgood, df.ngood, df.nsamp)
]
end = timer()
print(res[0:10])
print("Elapsed time: %fs" % (end - start))

This yields:

[0.58605423287644209, 0.38055520197355552, 0.70597920363472055, 0.99728041338849138, 0.79797439957395955, 0.42245057292366844, 0.58627644982763727, 0.74819471224742817, 0.75121042470714849, 0.48561471798885397]
Elapsed time: 2.069916s
[0.5860542328771666, 0.38055520197384757, 0.7059792036350717, 0.997280413389543, 0.7979743995750694, 0.4224505729249291, 0.5862764498272103, 0.7481947122472634, 0.7512104247082603, 0.4856147179890127]
Elapsed time: 0.018253s

So even with an ordinary for loop, the speed-up is quite significant.

这篇关于如何有效地矢量化超几何 CDF 计算?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆