计算包含高维矢量的两个矩阵之间的最小欧氏距离的最快方法 [英] Fastest way to calculate mininum euclidean distance between two matrices containing high dimensional vectors

查看:2115
本文介绍了计算包含高维矢量的两个矩阵之间的最小欧氏距离的最快方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在另一个线程,但后来我专注于如何使用OpenCV。无法实现我最初想要的,我会在这里询问我想要什么。



我有两个矩阵。矩阵a是2782x128,矩阵b是4000x128,都是无符号的char值。这些值存储在单个数组中。对于a中的每个向量,我需要b中的向量的索引与最接近的欧几里得距离。



好,现在我的代码实现这一点:

  #include< windows.h> 
#include< stdlib.h>
#include< stdio.h>
#include< cstdio>
#include< math.h>
#include< time.h>
#include< sys / timeb.h>
#include< iostream>
#include< fstream>
#includemain.h

using namespace std;

void main(int argc,char * argv [])
{
int a_size;
unsigned char * a = NULL;
read_matrix(& a,a_size,matrixa);
int b_size;
unsigned char * b = NULL;
read_matrix(& b,b_size,matrixb);

LARGE_INTEGER liStart
LARGE_INTEGER liEnd;
LARGE_INTEGER liPerfFreq;
QueryPerformanceFrequency(& liPerfFreq);
QueryPerformanceCounter(& liStart);

int * indexes = NULL;
min_distance_loop(& indexes,b,b_size,a,a_size);

QueryPerformanceCounter(& liEnd);

cout<< 循环时间:< (liEnd.QuadPart-liStart.QuadPart)/ long double(liPerfFreq.QuadPart)< s。 << endl;

if(a)
delete [] a;
if(b)
delete [] b;
if(indexes)
delete [] indexes;
return;
}

void read_matrix(unsigned char ** matrix,int& matrix_size,char * matrixPath)
{
ofstream myfile;
float f;
FILE * pFile;
pFile = fopen(matrixPath,r);
fscanf(pFile,%d,& matrix_size);
* matrix = new unsigned char [matrix_size * 128];

for(int i = 0; i {
unsigned int matPtr;
fscanf(pFile,%u,& matPtr);
matrix [i] =(unsigned char)matPtr;
}
fclose(pFile);
}

void min_distance_loop(int ** indices,unsigned char * b,int b_size,unsigned char * a,int a_size)
{
const int descrSize = 128;

* indexes =(int *)malloc(a_size * sizeof(int));
int dataIndex = 0;
int vocIndex = 0;
int min_distance;
int distance;
int multiply;

unsigned char * dataPtr;
unsigned char * vocPtr;
for(int i = 0; i {
min_distance = LONG_MAX;
for(int j = 0; j< b_size; ++ j)
{
distance = 0;
dataPtr =& a [dataIndex];
vocPtr =& b [vocIndex];

for(int k = 0; k {
multiply = * dataPtr ++ - * vocPtr ++;
distance + = multiply * multiply;
//如果距离大于先前计算的距离,则退出
if(distance> min_distance)
break;
}

//如果距离小于
if(distance< min_distance)
{
min_distance = distance;
(* indices)[i] = j;
}
vocIndex + = descrSize;
}
dataIndex + = descrSize;
vocIndex = 0;
}
}

附带示例矩阵的文件。 p>

matrixa
< a href =https://dl.dropbox.com/u/1474325/matrixb =nofollow> matrixb



我使用windows .h只是为了计算消耗时间,所以如果你想在另一个平台上测试代码,而不是windows,只需更改windows.h头并改变计算消耗时间的方式。



这段代码在我的电脑里约为0.5秒。问题是,我有另一个代码在Matlab中做同样的事情在0.05秒。在我的实验中,我每秒接收几个矩阵,如矩阵a,所以0.5秒是太多了。



现在matlab代码计算:

  aa = sum(a。* a,2); bb = sum(b。* b,2); ab = a * b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)])+ repmat(bb',[size(aa,1)1]) -
[minz index] = min(d,[],2);

好的。 Matlab代码使用那个(x-a)^ 2 = x ^ 2 + a ^ 2 - 2ab。



所以我的下一次尝试是做同样的事情。我删除了我自己的代码进行相同的计算,但是它是1.2秒大约。



然后,我试图使用不同的外部库。第一个尝试是Eigen:

  const int descrSize = 128; 
MatrixXi a(a_size,descrSize);
MatrixXi b(b_size,descrSize);
MatrixXi ab(a_size,b_size);

unsigned char * dataPtr = matrixa;
for(int i = 0; i {
for(int j = 0; j< descrSize; ++ j)
{
a(i,j)=(int)* dataPtr ++;
}
}
unsigned char * vocPtr = matrixb;
for(int i = 0; i {
for(int j = 0; j< descrSize; ++ j)
{
b(i,j)=(int)* vocPtr ++;
}
}
ab = a * b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise()。sum();
MatrixXi bb = b.rowwise()。sum();
MatrixXi d =(aa.replicate(1,vocabulary_size)+ bb.transpose()。replicate(nframes,1) - 2 * ab).cwiseAbs2

int * index = NULL;
index =(int *)malloc(nframes * sizeof(int));
for(int i = 0; i {
d.row(i).minCoeff(& index [i]);
}

这个本质代码的价格约为1.2, * b.transpose();



使用opencv的类似代码也使用了ab = a * b.transpose是0.65秒。



所以,真的很讨厌,matlab能够做这样的事情这么快,我不能在C + +!当然,能够运行我的实验将是伟大的,但我认为缺乏知识是什么真正令人讨厌的我。我如何实现至少与Matlab相同的性能?欢迎任何形式的解决。我的意思是,任何外部库(免费,如果可能),循环展开的东西,模板的东西,SSE intructions(我知道他们存在),缓存的东西。正如我所说,我的主要目的是增加我的知识,以便能够以更快的性能代码认为这样。



提前感谢



编辑:David Hammen建议的更多代码。在进行任何计算之前,我将数组转换为int。下面是代码:

  void min_distance_loop(int ** indices,unsigned char * b,int b_size,unsigned char * int a_size)
{
const int descrSize = 128;

int * a_int;
int * b_int;

LARGE_INTEGER liStart;
LARGE_INTEGER liEnd;
LARGE_INTEGER liPerfFreq;
QueryPerformanceFrequency(& liPerfFreq);
QueryPerformanceCounter(& liStart);

a_int =(int *)malloc(a_size * descrSize * sizeof(int));
b_int =(int *)malloc(b_size * descrSize * sizeof(int));

for(int i = 0; i a_int [i] =(int)a [i]
for(int i = 0; i b_int [i] =(int)b [i]

QueryPerformanceCounter(& liEnd);

cout<< 铸造时间:< (liEnd.QuadPart-liStart.QuadPart)/ long double(liPerfFreq.QuadPart)< s。 << endl;

* indexes =(int *)malloc(a_size * sizeof(int));
int dataIndex = 0;
int vocIndex = 0;
int min_distance;
int distance;
int multiply;

/ * unsigned char * dataPtr;
unsigned char * vocPtr; * /
int * dataPtr;
int * vocPtr;
for(int i = 0; i {
min_distance = LONG_MAX;
for(int j = 0; j< b_size; ++ j)
{
distance = 0;
dataPtr =& a_int [dataIndex];
vocPtr =& b_int [vocIndex];

for(int k = 0; k {
multiply = * dataPtr ++ - * vocPtr ++;
distance + = multiply * multiply;
//如果距离大于先前计算的距离,则退出
if(distance> min_distance)
break;
}

//如果距离小于
if(distance< min_distance)
{
min_distance = distance;
(* indices)[i] = j;
}
vocIndex + = descrSize;
}
dataIndex + = descrSize;
vocIndex = 0;
}
}

整个过程现在是0.6,开始时为0.001秒。也许我做错了什么?



EDIT2:关于Eigen的什么?当我寻找外部libs他们总是谈论Eigen和他们的速度。我做错了什么?这里一个简单的代码使用Eigen显示它不是这么快。也许我缺少一些配置或一些标志,或...

  MatrixXd A = MatrixXd :: Random ; 
MatrixXd B = MatrixXd :: Random(1000,500);
MatrixXd X;

此代码约为0.9秒。

解决方案

正如你所观察到的,你的代码由表示大约2.8e9算术运算的矩阵产品支配。 Yopu说,Matlab(或更确切地说高度优化的MKL)计算它在大约0.05s。这表示57 GFLOPS的速率,表明它不仅使用向量化还有多线程。使用Eigen,您可以通过编译OpenMP启用多线程( -fopenmp with gcc)。在我的5岁的电脑(2.66Ghz Core2),使用浮动和4线程,你的产品大约0.053s和0.16s没有OpenMP,所以你的编译标志必然有一些问题。总结一下,为了获得最佳的Eigen:




  • 以64位模式编译

  • 使用float

  • 如果您的CPU具有超线程,则禁用它或定义 OMP_NUM_THREADS 环境变量到物理内核的数量(这是非常重要的,否则性能会非常糟糕!)

  • 任务运行,最好将 OMP_NUM_THREADS 减少到 nb_cores-1

  • 使用您可以使用的最新编译器,GCC,clang和ICC是最好的,MSVC通常较慢。


I started a similar question on another thread, but then I was focusing on how to use OpenCV. Having failed to achieve what I originally wanted, I will ask here exactly what I want.

I have two matrices. Matrix a is 2782x128 and Matrix b is 4000x128, both unsigned char values. The values are stored in a single array. For each vector in a, I need the index of the vector in b with the closest euclidean distance.

Ok, now my code to achieve this:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

And attached are the files with sample matrices.

matrixa matrixb

I am using windows.h just to calculate the consuming time, so if you want to test the code in another platform than windows, just change windows.h header and change the way of calculating the consuming time.

This code in my computer is about 0.5 seconds. The problem is that I have another code in Matlab that makes this same thing in 0.05 seconds. In my experiments, I am receiving several matrices like matrix a every second, so 0.5 seconds is too much.

Now the matlab code to calculate this:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

Ok. Matlab code is using that (x-a)^2 = x^2 + a^2 - 2ab.

So my next attempt was to do the same thing. I deleted my own code to make the same calculations, but It was 1.2 seconds approx.

Then, I tried to use different external libraries. The first attempt was Eigen:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

This Eigen code costs 1.2 approx for just the line that says: ab = a*b.transpose();

A similar code using opencv was used also, and the cost of the ab = a*b.transpose(); was 0.65 seconds.

So, It is real annoying that matlab is able to do this same thing so quickly and I am not able in C++! Of course being able to run my experiment would be great, but I think the lack of knowledge is what really is annoying me. How can I achieve at least the same performance than in Matlab? Any kind of soluting is welcome. I mean, any external library (free if possible), loop unrolling things, template things, SSE intructions (I know they exist), cache things. As I said, my main purpose is increase my knowledge for being able to code thinks like this with a faster performance.

Thanks in advance

EDIT: more code suggested by David Hammen. I casted the arrays to int before making any calculations. Here is the code:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

The entire process is now 0.6, and the casting loops at the beginning are 0.001 seconds. Maybe I did something wrong?

EDIT2: Anything about Eigen? When I look for external libs they always talk about Eigen and their speed. I made something wrong? Here a simple code using Eigen that shows it is not so fast. Maybe I am missing some config or some flag, or ...

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

This code is about 0.9 seconds.

解决方案

As you observed, your code is dominated by the matrix product that represents about 2.8e9 arithmetic operations. Yopu say that Matlab (or rather the highly optimized MKL) computes it in about 0.05s. This represents a rate of 57 GFLOPS showing that it is not only using vectorization but also multi-threading. With Eigen, you can enable multi-threading by compiling with OpenMP enabled (-fopenmp with gcc). On my 5 years old computer (2.66Ghz Core2), using floats and 4 threads, your product takes about 0.053s, and 0.16s without OpenMP, so there must be something wrong with your compilation flags. To summary, to get the best of Eigen:

  • compile in 64bits mode
  • use floats (doubles are twice as slow owing to vectorization)
  • enable OpenMP
  • if your CPU has hyper-threading, then either disable it or define the OMP_NUM_THREADS environment variable to the number of physical cores (this is very important, otherwise the performance will be very bad!)
  • if you have other task running, it might be a good idea to reduce OMP_NUM_THREADS to nb_cores-1
  • use the most recent compiler that you can, GCC, clang and ICC are best, MSVC is usually slower.

这篇关于计算包含高维矢量的两个矩阵之间的最小欧氏距离的最快方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆