使用SIMD(AVX2)的稀疏阵列压缩 [英] Sparse array compression using SIMD (AVX2)

查看:166
本文介绍了使用SIMD(AVX2)的稀疏阵列压缩的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个稀疏数组a(大多数为零):

I have a sparse array a (mostly zeroes):

unsigned char a[1000000]; 

,我想使用带有AVX2的Intel x64体系结构上的SIMD指令创建指向a的非零元素的索引的数组b.我正在寻找技巧,以有效地做到这一点.具体来说,是否存在SIMD指令来获取SIMD寄存器中连续排列的连续非零元素的位置?

and I would like to create an array b of indexes to non-zero elements of a using SIMD instructions on Intel x64 architecture with AVX2. I'm looking for tips how to do it efficiently. Specifically, are there SIMD instruction(s) to get positions of consecutive non-zero elements in SIMD register, arranged contiguously?

推荐答案

五个计算非零索引的方法是:

Five methods to compute the indices of the nonzeros are:

  • 半向量化循环:使用字符加载SIMD向量,将其与零进行比较并应用移动掩码.如果任何字符都不为零,则使用小标量循环 (也由 @stgatilov 提出).这对于非常稀疏的数组非常有效.以下代码中的功能arr2ind_movmsk使用BMI1指令 标量循环.

  • Semi vectorized loop: Load a SIMD vector with chars, compare with zero and apply a movemask. Use a small scalar loop if any of the chars is nonzero (also suggested by @stgatilov). This works well for very sparse arrays. Function arr2ind_movmsk in the code below uses BMI1 instructions for the scalar loop.

向量化循环:Intel Haswell处理器和更新的处理器支持BMI1和BMI2指令集. BMI2包含 pext指令(并行位提取,请参见Wikipedia链接), 事实证明在这里很有用.请参见下面的代码中的arr2ind_pext.

Vectorized loop: Intel Haswell processors and newer support the BMI1 and BMI2 instruction sets. BMI2 contains the pext instruction (Parallel bits extract, see wikipedia link), which turns out to be useful here. See arr2ind_pext in the code below.

带有if语句的经典标量循环:arr2ind_if.

Classic scalar loop with if statement: arr2ind_if.

无分支的标量循环:arr2ind_cmov.

查找表: @stgatilov 显示可以使用查找表代替pdep和其他整数 指示.这可能很好用,但是查找表很大:它不适合L1高速缓存. 此处未测试.另请参见

Lookup table: @stgatilov shows that it is possible to use a lookup table instead of the pdep and other integer instructions. This might work well, however, the lookup table is quite large: it doesn't fit in the L1 cache. Not tested here. See also the discussion here.


/* 
gcc -O3 -Wall -m64 -mavx2 -fopenmp -march=broadwell -std=c99 -falign-loops=16 sprs_char2ind.c

example: Test different methods with an array a of size 20000 and approximate 25/1024*100%=2.4% nonzeros:   
              ./a.out 20000 25
*/

#include <stdio.h>
#include <immintrin.h>
#include <stdint.h>
#include <omp.h> 
#include <string.h>


__attribute__ ((noinline)) int arr2ind_movmsk(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0, k;
   __m256i msk;
   m0=0;
   for (i=0;i<n;i=i+32){                              /* Load 32 bytes and compare with zero:           */
      msk=_mm256_cmpeq_epi8(_mm256_load_si256((__m256i *)&a[i]),_mm256_setzero_si256());
      k=_mm256_movemask_epi8(msk);
      k=~k;                                           /* Search for nonzero bits instead of zero bits.  */
      while (k){
         ind[m0]=i+_tzcnt_u32(k);                     /* Count the number of trailing zero bits in k.   */
         m0++;
         k=_blsr_u32(k);                              /* Clear the lowest set bit in k.                 */
      }
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int arr2ind_pext(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   uint64_t     cntr_const = 0xFEDCBA9876543210;
   __m256i      shft       = _mm256_set_epi64x(0x04,0x00,0x04,0x00);
   __m256i      vmsk       = _mm256_set1_epi8(0x0F);
   __m256i      cnst16     = _mm256_set1_epi32(16);
   __m256i      shf_lo     = _mm256_set_epi8(0x80,0x80,0x80,0x0B,   0x80,0x80,0x80,0x03,   0x80,0x80,0x80,0x0A,   0x80,0x80,0x80,0x02,
                                             0x80,0x80,0x80,0x09,   0x80,0x80,0x80,0x01,   0x80,0x80,0x80,0x08,   0x80,0x80,0x80,0x00);
   __m256i      shf_hi     = _mm256_set_epi8(0x80,0x80,0x80,0x0F,   0x80,0x80,0x80,0x07,   0x80,0x80,0x80,0x0E,   0x80,0x80,0x80,0x06,
                                             0x80,0x80,0x80,0x0D,   0x80,0x80,0x80,0x05,   0x80,0x80,0x80,0x0C,   0x80,0x80,0x80,0x04);
   __m128i      pshufbcnst = _mm_set_epi8(0x80,0x80,0x80,0x80,0x80,0x80,0x80,0x80,  0x0E,0x0C,0x0A,0x08,0x06,0x04,0x02,0x00);                                            

   __m256i      i_vec      = _mm256_setzero_si256();
   m0=0;
   for (i=0;i<n;i=i+16){
      __m128i   v          = _mm_load_si128((__m128i *)&a[i]);                     /* Load 16 bytes.                                                                               */
      __m128i   msk        = _mm_cmpeq_epi8(v,_mm_setzero_si128());                /* Generate 16x8 bit mask.                                                                      */
                msk        = _mm_srli_epi64(msk,4);                                /* Pack 16x8 bit mask to 16x4 bit mask.                                                         */
                msk        = _mm_shuffle_epi8(msk,pshufbcnst);                     /* Pack 16x8 bit mask to 16x4 bit mask.                                                         */
                msk        = _mm_xor_si128(msk,_mm_set1_epi32(-1));                /* Invert 16x4 mask.                                                                            */
      uint64_t  msk64      = _mm_cvtsi128_si64x(msk);                              /* _mm_popcnt_u64 and _pext_u64 work on 64-bit general-purpose registers, not on simd registers.*/
      int       p          = _mm_popcnt_u64(msk64)>>2;                             /* p is the number of nonzeros in 16 bytes of a.                                                */
      uint64_t  cntr       = _pext_u64(cntr_const,msk64);                          /* parallel bits extract. cntr contains p 4-bit integers. The 16 4-bit integers in cntr_const are shuffled to the p 4-bit integers that we want */
                                                                                   /* The next 7 intrinsics unpack these p 4-bit integers to p 32-bit integers.                    */  
      __m256i   cntr256    = _mm256_set1_epi64x(cntr);
                cntr256    = _mm256_srlv_epi64(cntr256,shft);
                cntr256    = _mm256_and_si256(cntr256,vmsk);
      __m256i   cntr256_lo = _mm256_shuffle_epi8(cntr256,shf_lo);
      __m256i   cntr256_hi = _mm256_shuffle_epi8(cntr256,shf_hi);
                cntr256_lo = _mm256_add_epi32(i_vec,cntr256_lo);
                cntr256_hi = _mm256_add_epi32(i_vec,cntr256_hi);

                             _mm256_storeu_si256((__m256i *)&ind[m0],cntr256_lo);     /* Note that the stores of iteration i and i+16 may overlap.                                                         */
                             _mm256_storeu_si256((__m256i *)&ind[m0+8],cntr256_hi);   /* Array ind has to be large enough to avoid segfaults. At most 16 integers are written more than strictly necessary */ 
                m0         = m0+p;
                i_vec      = _mm256_add_epi32(i_vec,cnst16);
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int arr2ind_if(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   m0=0;
   for (i=0;i<n;i++){
      if (a[i]!=0){
         ind[m0]=i;
         m0=m0+1;
      }
   }
   *m=m0;
   return 0;
}


__attribute__((noinline)) int arr2ind_cmov(const unsigned char * restrict a, int n, int * restrict ind, int * m){
   int i, m0;
   m0=0;
   for (i=0;i<n;i++){
      ind[m0]=i;
      m0=(a[i]==0)? m0 : m0+1;   /* Compiles to cmov instruction. */
   }
   *m=m0;
   return 0;
}


__attribute__ ((noinline)) int print_nonz(const unsigned char * restrict a, const int * restrict ind, const int m){
   int i;
   for (i=0;i<m;i++) printf("i=%d,  ind[i]=%d   a[ind[i]]=%u\n",i,ind[i],a[ind[i]]);
   printf("\n");   fflush( stdout );
   return 0;
}


__attribute__ ((noinline)) int print_chk(const unsigned char * restrict a, const int * restrict ind, const int m){
   int i;                              /* Compute a hash to compare the results of different methods. */
   unsigned int chk=0;
   for (i=0;i<m;i++){
      chk=((chk<<1)|(chk>>31))^(ind[i]);
   }
   printf("chk = %10X\n",chk);
   return 0;
}



int main(int argc, char **argv){
int n, i, m; 
unsigned int j, k, d;
unsigned char *a;
int *ind;
double t0,t1;
int meth, nrep;
char txt[30];

sscanf(argv[1],"%d",&n);            /* Length of array a.                                    */
n=n>>5;                             /* Adjust n to a multiple of 32.                         */
n=n<<5;
sscanf(argv[2],"%u",&d);            /* The approximate fraction of nonzeros in a is: d/1024  */
printf("n=%d,   d=%u\n",n,d);

a=_mm_malloc(n*sizeof(char),32);
ind=_mm_malloc(n*sizeof(int),32);    

                                    /* Generate a pseudo random array a.                     */
j=73659343;                   
for (i=0;i<n;i++){
   j=j*653+1;
   k=(j & 0x3FF00)>>8;              /* k is a pseudo random number between 0 and 1023        */
   if (k<d){
      a[i] = (j&0xFE)+1;            /* Set a[i] to nonzero.                                  */
   }else{
      a[i] = 0;
   }

}

/* for (i=0;i<n;i++){if (a[i]!=0){printf("i=%d,   a[i]=%u\n",i,a[i]);}} printf("\n");   */ /* Uncomment this line to print the nonzeros in a. */ 

char txt0[]="arr2ind_movmsk: ";
char txt1[]="arr2ind_pext:   ";
char txt2[]="arr2ind_if:     ";
char txt3[]="arr2ind_cmov:   ";

nrep=10000;                                   /* Repeat a function nrep times to make relatively accurate timings possible.                          */
                                              /* With nrep=1000000:   ./a.out 10016 4 ;   ./a.out 10016 48 ;   ./a.out 10016 519                    */
                                              /* With nrep=10000:     ./a.out 1000000 5 ; ./a.out 1000000 52 ; ./a.out 1000000 513                  */
printf("nrep = \%d  \n\n",nrep);
arr2ind_movmsk(a,n,ind,&m);                   /* Make sure that the arrays a and ind are read and/or written at least one time before benchmarking. */
for (meth=0;meth<4;meth++){
   t0=omp_get_wtime();
   switch (meth){
      case 0: for(i=0;i<nrep;i++) arr2ind_movmsk(a,n,ind,&m);         strcpy(txt,txt0); break;
      case 1: for(i=0;i<nrep;i++) arr2ind_pext(a,n,ind,&m);           strcpy(txt,txt1); break;
      case 2: for(i=0;i<nrep;i++) arr2ind_if(a,n,ind,&m);             strcpy(txt,txt2); break;
      case 3: for(i=0;i<nrep;i++) arr2ind_cmov(a,n,ind,&m);           strcpy(txt,txt3); break;
      default: ;
   }
   t1=omp_get_wtime();
   printf("method = %s  ",txt);
   /* print_chk(a,ind,m);  */
   printf(" elapsed time = %6.2f\n",t1-t0);
}
print_nonz(a, ind, 2);                                            /* Do something with the results                 */
printf("density = %f %% \n\n",((double)m)/((double)n)*100);       /* Actual nonzero density of array a.            */

/* print_nonz(a, ind, m);    */  /* Uncomment this line to print the indices of the nonzeros.                      */

return 0;
}

/*
With nrep=1000000:
 ./a.out 10016 4 ;    ./a.out 10016 4 ;    ./a.out 10016 48 ;    ./a.out 10016 48 ;    ./a.out 10016 519  ;    ./a.out 10016 519       
With nrep=10000:  
 ./a.out 1000000 5 ;  ./a.out 1000000 5 ;  ./a.out 1000000 52 ;  ./a.out 1000000 52 ;  ./a.out 1000000 513 ;   ./a.out 1000000 513     
*/


该代码已用数组大小​​为n = 10016(数据适合L1缓存)和n = 1000000进行了测试,其中 大约0.5%,5%和50%的不同非零密度.为了精确计时,这些功能称为1000000 和10000次.

The code was tested with array size of n=10016 (the data fits in L1 cache) and n=1000000, with different nonzero densities of about 0.5%, 5% and 50%. For accurate timing the functions were called 1000000 and 10000 times, respectively.


Time in seconds, size n=10016, 1e6 function calls. Intel core i5-6500
                     0.53%        5.1%       50.0%
arr2ind_movmsk:       0.27        0.53        4.89
arr2ind_pext:         1.44        1.59        1.45
arr2ind_if:           5.93        8.95       33.82
arr2ind_cmov:         6.82        6.83        6.82

Time in seconds, size n=1000000, 1e4 function calls.

                     0.49%        5.1%       50.1%
arr2ind_movmsk:       0.57        2.03        5.37
arr2ind_pext:         1.47        1.47        1.46
arr2ind_if:           5.88        8.98       38.59
arr2ind_cmov:         6.82        6.81        6.81


在这些示例中,矢量化循环比标量循环更快. arr2ind_movmsk的性能在很大程度上取决于a的密度.只有 如果密度足够小,则比arr2ind_pext快.收支平衡点还取决于数组大小n. 函数'arr2ind_if'显然遭受了50%非零密度的分支预测失败.

In these examples the vectorized loops are faster than the scalar loops. The performance of arr2ind_movmsk depends a lot on the density of a. It is only faster than arr2ind_pext if the density is sufficiently small. The break-even point also depends on the array size n. Function 'arr2ind_if' clearly suffers from failing branch prediction at 50% nonzero density.

这篇关于使用SIMD(AVX2)的稀疏阵列压缩的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆