两个大文件的平行余弦相似度 [英] Parallel Cosine similarity of two large files with each other

查看:135
本文介绍了两个大文件的平行余弦相似度的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个文件:A和B

I have two files: A and B

A has 400,000 lines each having 50 float values
B has 40,000 lines having 50 float values.

对于B中的每一行,我需要在A中找到具有> 90%相似度(余弦)的相应行.

For every line in B, I need to find corresponding lines in A which have >90% similarity (cosine).

对于线性搜索和计算,代码需要花费大量的计算时间. (40-50小时)

For linear search and computation, the code takes ginormous computing time. (40-50 hours)

向社区征询有关如何加快流程速度的建议(用于实现此流程的博客/资源链接,例如AWS/Cloud).已经被这个问题困扰了很长时间了!

Reaching out to the community for suggestions on how to fasten the process (link of blogs/resources such as AWS/Cloud to be used to achieve it). Have been stuck with this for quite a while!

[提到rpud/rpudplus可以做到这一点,但似乎无法在云资源上执行它们]

[There were mentions of rpud/rpudplus to do it, but can't seem to perform them on cloud resources]

根据要求,余弦相似度的代码为:

N.B. as requested, the code for cosine similarity is:

for line1, line2 in zip(f1, f2):
    line1 = line1[:-1]
    cnt = cnt + 1
    l2X = [float(i) for i in line2.split()]
    f3 = open(enLabelValues, 'r')
    f4 = open(enVectorValues, 'r')
    print cnt
    cnt_j = 0
    for line3, line4 in zip(f3, f4):
        line3 = line3[:-1]
        l4X = [float(i) for i in line4.split()]
        ########This is the line for spatial cosine similarity
        result = 1 - spatial.distance.cosine(l2X, l4X)
        cnt_j = cnt_j + 1
        if(result > float(0.95)):
            if line3 not in a.keys():
                a[line3] = True
                fLabel_2.write(line3+"\n")
                fX_2.write(line4)
        fLabel_2.flush()
        fX_2.flush()
        os.fsync(fLabel_2.fileno())
        os.fsync(fX_2.fileno())

推荐答案

我可以生成40,000和400,000行的合成文件,每行50个样本,并在合理的4核(+超线程)的大约2分钟18秒内处理它们我笨拙的C ++风格的台式机iMac,没有使用 GNU Parallel 进行任何SIMD优化(对我来说).

I can generate synthetic files of 40,000 and 400,000 lines with 50 samples per line and process them in around 2 minutes 18 seconds on a reasonable 4 core (+hyperthreading) desktop iMac in my clumsy style of C++ without any SIMD optimisation (by me) using GNU Parallel.

这是顶级脚本.您可以在"a.txt""b.txt"中看到它生成测试数据.然后将其压缩" "b.txt"到相同的二进制表示形式,并将预先计算的幅度附加到每行.最后,它对"a.txt"中的行进行编号,然后将其传递到 GNU Parallel 中,该行将这些行分成大约5200行的组,并开始进行8个并行进程的组,以将这些行中的每行与40,000行进行比较. B中的行.

Here is the top-level script. You can see it generates the test data in "a.txt" and "b.txt". Then it "compresses" "b.txt" to an identical binary representation, with the pre-computed magnitude appended to each line. Finally, it numbers the lines in "a.txt" and passes them into GNU Parallel which splits the lines into groups of around 5,200 lines and starts a group of 8 parallel processes to compare each of those lines with the 40,000 lines in B.

#!/bin/bash

# Generate test data - a.txt b.txt
./generate

# Preprocess b.txt into binary with precomputed magitudes save as B
./preprocess

# Process file A in batches
cat -n a.txt | parallel --block-size 2M --line-buffer --pipe ./process {#}

这是generate.cpp程序,用于合成数据:

Here is the generate.cpp program to synthesise data:

#include <iostream>
#include <cstdlib>
#include <fstream>
#include "common.h"

using namespace std;

int main()
{
   int line,sample;
   ofstream a("a.txt");
   if (!a.is_open()){
      cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }
   for(line=0;line<ALINES;line++){
      for(sample=0;sample<SAMPLESPERLINE;sample++){
         a << (float)rand()*100/RAND_MAX << " ";
      }
      a << endl;
   }
   a.close();
   ofstream b("b.txt");
   if (!b.is_open()){
      cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }
   for(line=0;line<BLINES;line++){
      for(sample=0;sample<SAMPLESPERLINE;sample++){
         b << (float)rand()*100/RAND_MAX << " ";
      }
      b << endl;
   }
   b.close();
}

这是preprocess.cpp代码:

Here is the preprocess.cpp code:

#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <cmath>
#include "common.h"

int main(int argc, char* argv[]){

   std::ifstream btxt("b.txt");
   std::ofstream bbin("B",std::ios::out|std::ios::binary);
   if (!btxt.is_open()){
      std::cerr << "ERROR: Unable to open input file";
      exit(EXIT_FAILURE);
   }
   if (!bbin.is_open()){
      std::cerr << "ERROR: Unable to open output file";
      exit(EXIT_FAILURE);
   }

   int l=0;
   std::string line;
   std::vector<float> v;
   v.resize(SAMPLESPERLINE+1);
   while (std::getline(btxt,line)){
      std::istringstream iss(line);
      v.clear();
      float f;
      double magnitude;
      magnitude=0.0;
      int s=0;
      while (iss >> f){
         v[s]=(f);
         magnitude+=(double)f*f;
         s++;
      }
      // Append the magnitude to the end of the "line"
      v[s]=(float)sqrt(magnitude);
      // Write the samples and magnitide in binary to the output file
      bbin.write(reinterpret_cast<char*>(&v[0]),(SAMPLESPERLINE+1)*sizeof(float));
      l++;
   }
   btxt.close();
   bbin.close();

   return EXIT_SUCCESS;
}

这是common.h文件:

const int ALINES=400000;
const int BLINES=40000;
const int SAMPLESPERLINE=50;

这是process.cpp代码:

#include <sstream>
#include <fstream>
#include <string>
#include <iostream>
#include <stdlib.h>
#include <vector>
#include <array>
#include <cmath>
#include "common.h"

int main(int argc, char* argv[]){

   if(argc!=2){
      std::cerr << "Usage: process JOBNUM" << std::endl;
      exit(1);
   }
   int JobNum=std::atoi(argv[1]);
   std::cerr << "Starting job: " << JobNum << std::endl;

   // Load B
   std::ifstream bbin("B",std::ios::binary);
   if (!bbin.is_open()){
      std::cerr << "ERROR: Unable to open B";
      exit(EXIT_FAILURE);
   }

   int l=0;
   std::array<float,SAMPLESPERLINE+1> record;
   std::vector<std::array<float,SAMPLESPERLINE+1>> B;
   B.resize(BLINES);
   for(l=0;l<BLINES;l++){
      // Read one record of 50 floats and their magnitude
      bbin.read(reinterpret_cast<char*>(&B[l][0]),sizeof(float)*(SAMPLESPERLINE+1));
   }
   bbin.close();

   // Process all lines read from stdin, each line prepended by its line number
   // Format is:
   // <line number in file "a.txt"> <SAMPLE0> <SAMPLE1> ... <SAMPLE49>
   int nLines=0;
   std::string line;
   while (std::getline(std::cin,line)){
      nLines++;
      std::istringstream iss(line);
      std::vector<float> A;
      A.resize(SAMPLESPERLINE);
      float f;
      int Alineno;
      int s=0;
      iss >> Alineno;
      double dMag=0.0;
      while (iss >> f){
         A[s++]=f;
         dMag+=(double)f*f;
      }
      // Root magnitude
      float AMagnitude=(float)sqrt(dMag);

      // At this point we have in B, 40,000 records each of 50 samples followed by the magnitude
      // ... and we have a single record from "a.txt" with 50 samples and its magnitude in AMagnitude
      // ... and Alineno is the absolute line number in "a.txt" of this line
      // Time to do the actual calculation: compare this record to all records in B
      for(int brec=0;brec<BLINES;brec++){
         float BMagnitude=B[brec][SAMPLESPERLINE];
         double dotproduct=0.0;
         float *a = &A[0];
         float *b = &B[brec][0];
         for(s=0;s<SAMPLESPERLINE;s++){
            dotproduct += (*a++) * (*b++);
         }
         float similarity = dotproduct/(AMagnitude*BMagnitude);
         if(similarity>0.99){
            std::cout << "Line A: " << Alineno << ", line B: " << brec << ", similarity:" << similarity << std::endl;
         }
      }
   }
   std::cerr << "Ending job: " << JobNum << ", processed " << nLines << " lines" << std::endl;

   return EXIT_SUCCESS;
}

Makefile非常简单:

CFLAGS= -std=c++11 -O3 -march=native

all:    generate preprocess process

generate:   generate.cpp
        clang++ ${CFLAGS} generate.cpp -o generate

preprocess: preprocess.cpp
        clang++ ${CFLAGS} preprocess.cpp -o preprocess

process:    process.cpp
        clang++ ${CFLAGS} process.cpp -o process


运行它时,它会将CPU固定2分钟,如下所示:


When you run it, it pegs the CPU for 2 minutes and looks like this:

time ./go
Starting job: 3
Starting job: 7
Starting job: 8
Starting job: 2
Starting job: 5
Starting job: 1
Starting job: 4
Starting job: 6
Ending job: 1, processed 5204 lines
Starting job: 9
Ending job: 2, processed 5203 lines
Ending job: 3, processed 5204 lines
Starting job: 11
Starting job: 10
Ending job: 4, processed 5204 lines
Starting job: 12
Ending job: 5, processed 5203 lines
Ending job: 6, processed 5203 lines
Starting job: 14
Starting job: 13
...
...
Starting job: 75
Ending job: 68, processed 5204 lines
Ending job: 69, processed 5203 lines
Starting job: 76
Starting job: 77
Ending job: 70, processed 5203 lines
Ending job: 71, processed 5204 lines
Ending job: 72, processed 5203 lines
Ending job: 77, processed 4535 lines
Ending job: 74, processed 5204 lines
Ending job: 73, processed 5205 lines
Ending job: 75, processed 5204 lines
Ending job: 76, processed 5203 lines

real    2m17.510s
user    16m24.533s
sys     0m4.426s


请注意,我没有进行任何显式的SIMD或循环展开,也没有使用任何内部函数来形成点积.我怀疑您是否问过有关形成点积并用simdavx标记的问题,有人会帮助您对其进行优化.


Note that I have not done any explicit SIMD or loop-unrolling or used any intrinsics to form the dot-product. I suspect if you asked a question about forming a dot-product and tagged it with simd or avx, someone would help you optimise it.

还请注意,假设您已经ssh登录到计算机,则可以使用 GNU Parallel 在多台计算机上轻松运行此代码,只需使用:

Note also that you could easily run this code across multiple computers with GNU Parallel, assuming you have ssh login to them, just using:

parallel -S host1,host2,host3 ....

例如,我的网络上有一台6核Debian PC,因此我在以下4台Mac和6核Debian计算机上并行运行了以上代码:

For example, I have a 6-core Debian PC on my network, so I ran the above code parallelised across my 4-core Mac and 6-core Debian machine with:

parallel -S :,debian ...

然后需要1分8秒.

这篇关于两个大文件的平行余弦相似度的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆