计算包含高维向量的两个矩阵之间的最小欧式距离的最快方法 [英] Fastest way to calculate minimum euclidean distance between two matrices containing high dimensional vectors

查看:90
本文介绍了计算包含高维向量的两个矩阵之间的最小欧式距离的最快方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在另一个线程,但是后来我专注于如何使用OpenCV.未能达到我最初想要的目的,我将在这里确切地问我想要什么.

I started a similar question on another thread, but then I was focusing on how to use OpenCV. Having failed to achieve what I originally wanted, I will ask here exactly what I want.

我有两个矩阵.矩阵a为2782x128,矩阵b为4000x128,均为无符号char值.这些值存储在单个数组中.对于a中的每个向量,我需要b中具有最接近欧几里德距离的向量的索引.

I have two matrices. Matrix a is 2782x128 and Matrix b is 4000x128, both unsigned char values. The values are stored in a single array. For each vector in a, I need the index of the vector in b with the closest euclidean distance.

好吧,现在我的代码实现了这一点:

Ok, now my code to achieve this:

#include <windows.h>
#include <stdlib.h>
#include <stdio.h>
#include <cstdio>
#include <math.h>
#include <time.h>
#include <sys/timeb.h>
#include <iostream>
#include <fstream>
#include "main.h"

using namespace std;

void main(int argc, char* argv[])
{
    int a_size;
    unsigned char* a = NULL;
    read_matrix(&a, a_size,"matrixa");
    int b_size;
    unsigned char* b = NULL;
    read_matrix(&b, b_size,"matrixb");

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    int* indexes = NULL;
    min_distance_loop(&indexes, b, b_size, a, a_size);

    QueryPerformanceCounter( &liEnd );

    cout << "loop time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    if (a)
    delete[]a;
if (b)
    delete[]b;
if (indexes)
    delete[]indexes;
    return;
}

void read_matrix(unsigned char** matrix, int& matrix_size, char* matrixPath)
{
    ofstream myfile;
    float f;
    FILE * pFile;
    pFile = fopen (matrixPath,"r");
    fscanf (pFile, "%d", &matrix_size);
    *matrix = new unsigned char[matrix_size*128];

    for (int i=0; i<matrix_size*128; ++i)
    {
        unsigned int matPtr;
        fscanf (pFile, "%u", &matPtr);
        matrix[i]=(unsigned char)matPtr;
    }
    fclose (pFile);
}

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    unsigned char* dataPtr;
    unsigned char* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a[dataIndex];
            vocPtr = &b[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

并附带带有示例矩阵的文件.

And attached are the files with sample matrices.

矩阵 矩阵

我使用Windows.h只是为了计算消耗时间,因此,如果要在Windows以外的其他平台上测试代码,只需更改windows.h标头并更改计算消耗时间的方式即可.

I am using windows.h just to calculate the consuming time, so if you want to test the code in another platform than windows, just change windows.h header and change the way of calculating the consuming time.

我的计算机中的这段代码大约需要0.5秒.问题是我在Matlab中还有另一个代码,可以在0.05秒内完成相同的操作.在我的实验中,我每秒会收到几个矩阵,例如矩阵,所以0.5秒太多了.

This code in my computer is about 0.5 seconds. The problem is that I have another code in Matlab that makes this same thing in 0.05 seconds. In my experiments, I am receiving several matrices like matrix a every second, so 0.5 seconds is too much.

现在使用matlab代码进行计算:

Now the matlab code to calculate this:

aa=sum(a.*a,2); bb=sum(b.*b,2); ab=a*b'; 
d = sqrt(abs(repmat(aa,[1 size(bb,1)]) + repmat(bb',[size(aa,1) 1]) - 2*ab));
[minz index]=min(d,[],2);

好的. Matlab代码正在使用(x-a)^ 2 = x ^ 2 + a ^ 2-2ab.

Ok. Matlab code is using that (x-a)^2 = x^2 + a^2 - 2ab.

所以我的下一个尝试是做同样的事情.我删除了自己的代码以进行相同的计算,但大约需要1.2秒.

So my next attempt was to do the same thing. I deleted my own code to make the same calculations, but It was 1.2 seconds approx.

然后,我尝试使用其他外部库.第一次尝试是本征:

Then, I tried to use different external libraries. The first attempt was Eigen:

const int descrSize = 128;
MatrixXi a(a_size, descrSize);
MatrixXi b(b_size, descrSize);
MatrixXi ab(a_size, b_size);

unsigned char* dataPtr = matrixa;
for (int i=0; i<nframes; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        a(i,j)=(int)*dataPtr++;
    }
}
unsigned char* vocPtr = matrixb;
for (int i=0; i<vocabulary_size; ++i)
{
    for (int j=0; j<descrSize; ++j)
    {
        b(i,j)=(int)*vocPtr ++;
    }
}
ab = a*b.transpose();
a.cwiseProduct(a);
b.cwiseProduct(b);
MatrixXi aa = a.rowwise().sum();
MatrixXi bb = b.rowwise().sum();
MatrixXi d = (aa.replicate(1,vocabulary_size) + bb.transpose().replicate(nframes,1) - 2*ab).cwiseAbs2();

int* index = NULL;
index = (int*)malloc(nframes*sizeof(int));
for (int i=0; i<nframes; ++i)
{
    d.row(i).minCoeff(&index[i]);
}

此Eigen代码仅需显示以下一行代码,费用为1.2左右:ab = a * b.transpose();

This Eigen code costs 1.2 approx for just the line that says: ab = a*b.transpose();

还使用了类似的使用opencv的代码,并且ab = a * b.transpose();的成本.是0.65秒.

A similar code using opencv was used also, and the cost of the ab = a*b.transpose(); was 0.65 seconds.

所以,令人讨厌的是matlab能够这么快地完成同样的事情,而我却不能用C ++!当然能够进行我的实验会很棒,但是我认为缺乏知识才是真正让我烦恼的事情.如何至少获得与Matlab相同的性能?任何解决方案都是欢迎的.我的意思是,任何外部库(可能的话都是免费的),循环展开的东西,模板的东西,SSE指令(我知道它们存在),缓存的东西.就像我说的那样,我的主要目的是提高我的知识,以便能够以更快的性能编码这样的想法.

So, It is real annoying that matlab is able to do this same thing so quickly and I am not able in C++! Of course being able to run my experiment would be great, but I think the lack of knowledge is what really is annoying me. How can I achieve at least the same performance than in Matlab? Any kind of soluting is welcome. I mean, any external library (free if possible), loop unrolling things, template things, SSE intructions (I know they exist), cache things. As I said, my main purpose is increase my knowledge for being able to code thinks like this with a faster performance.

预先感谢

David Hammen建议更多代码.在进行任何计算之前,我将数组强制转换为int.这是代码:

more code suggested by David Hammen. I casted the arrays to int before making any calculations. Here is the code:

void min_distance_loop(int** indexes, unsigned char* b, int b_size, unsigned char* a, int a_size)
{
    const int descrSize = 128;

    int* a_int;
    int* b_int;

    LARGE_INTEGER liStart;
    LARGE_INTEGER liEnd;
    LARGE_INTEGER liPerfFreq;
    QueryPerformanceFrequency( &liPerfFreq );
    QueryPerformanceCounter( &liStart );

    a_int = (int*)malloc(a_size*descrSize*sizeof(int));
    b_int = (int*)malloc(b_size*descrSize*sizeof(int));

    for(int i=0; i<descrSize*a_size; ++i)
        a_int[i]=(int)a[i];
    for(int i=0; i<descrSize*b_size; ++i)
        b_int[i]=(int)b[i];

    QueryPerformanceCounter( &liEnd );

    cout << "Casting time: " << (liEnd.QuadPart - liStart.QuadPart) / long double(liPerfFreq.QuadPart) << "s." << endl;

    *indexes = (int*)malloc(a_size*sizeof(int));
    int dataIndex=0;
    int vocIndex=0;
    int min_distance;
    int distance;
    int multiply;

    /*unsigned char* dataPtr;
    unsigned char* vocPtr;*/
    int* dataPtr;
    int* vocPtr;
    for (int i=0; i<a_size; ++i)
    {
        min_distance = LONG_MAX;
        for (int j=0; j<b_size; ++j)
        {
            distance=0;
            dataPtr = &a_int[dataIndex];
            vocPtr = &b_int[vocIndex];

            for (int k=0; k<descrSize; ++k)
            {
                multiply = *dataPtr++-*vocPtr++;
                distance += multiply*multiply;
                // If the distance is greater than the previously calculated, exit
                if (distance>min_distance)
                    break;
            }

            // if distance smaller
            if (distance<min_distance)
            {
                min_distance = distance;
                (*indexes)[i] = j;
            }
            vocIndex+=descrSize;
        }
        dataIndex+=descrSize;
        vocIndex=0;
    }
}

整个过程现在为0.6,开始的投射循环为0.001秒.也许我做错了什么?

The entire process is now 0.6, and the casting loops at the beginning are 0.001 seconds. Maybe I did something wrong?

关于Eigen的事吗?当我寻找外部库时,他们总是谈论本征及其速度.我做错了吗?这里使用Eigen的简单代码显示它并没有那么快.也许我缺少一些配置或标志,或者...

Anything about Eigen? When I look for external libs they always talk about Eigen and their speed. I made something wrong? Here a simple code using Eigen that shows it is not so fast. Maybe I am missing some config or some flag, or ...

MatrixXd A = MatrixXd::Random(1000, 1000);
MatrixXd B = MatrixXd::Random(1000, 500);
MatrixXd X;

此代码大约需要0.9秒.

This code is about 0.9 seconds.

推荐答案

如您所见,您的代码由代表约2.8e9算术运算的矩阵乘积支配.尤普说,Matlab(或者说是高度优化的MKL)在大约0.05 s内即可计算出它.这代表了57 GFLOPS的速率,这表明它不仅使用矢量化而且使用多线程.使用Eigen,可以通过启用OpenMP(gcc中的-fopenmp)进行编译来启用多线程.在我使用5年的旧计算机(2.66Ghz Core2)上,使用浮点数和4个线程,您的产品大约需要0.053s的时间,而在没有OpenMP的情况下需要0.16s的时间,因此您的编译标志一定有问题.总而言之,要充分利用本征:

As you observed, your code is dominated by the matrix product that represents about 2.8e9 arithmetic operations. Yopu say that Matlab (or rather the highly optimized MKL) computes it in about 0.05s. This represents a rate of 57 GFLOPS showing that it is not only using vectorization but also multi-threading. With Eigen, you can enable multi-threading by compiling with OpenMP enabled (-fopenmp with gcc). On my 5 years old computer (2.66Ghz Core2), using floats and 4 threads, your product takes about 0.053s, and 0.16s without OpenMP, so there must be something wrong with your compilation flags. To summary, to get the best of Eigen:

  • 以64位模式编译
  • 使用浮点运算(由于矢量化,双精度运算的速度是原来的两倍)
  • 启用OpenMP
  • 如果您的CPU具有超线程,请禁用它或将OMP_NUM_THREADS环境变量定义为物理核的数量(这非常重要,否则性能会很差!)
  • 如果还有其他任务正在运行,最好将OMP_NUM_THREADS减小为nb_cores-1
  • 使用最新的编译器,最好使用GCC,clang和ICC,而MSVC通常较慢.
  • compile in 64bits mode
  • use floats (doubles are twice as slow owing to vectorization)
  • enable OpenMP
  • if your CPU has hyper-threading, then either disable it or define the OMP_NUM_THREADS environment variable to the number of physical cores (this is very important, otherwise the performance will be very bad!)
  • if you have other task running, it might be a good idea to reduce OMP_NUM_THREADS to nb_cores-1
  • use the most recent compiler that you can, GCC, clang and ICC are best, MSVC is usually slower.

这篇关于计算包含高维向量的两个矩阵之间的最小欧式距离的最快方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆