AVX或SSE上的水平尾随最大值 [英] Horizontal trailing maximum on AVX or SSE

查看:100
本文介绍了AVX或SSE上的水平尾随最大值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个由16位值组成的__m256i寄存器,我想获取每个尾随元素的最大值为零.

I have an __m256i register consisting of 16bit values and I want to get the maximum values on each trailing element which are zeroes.

举个例子:

input:  1 0 0 3 0 0 4 5 0 0 0 0 4 3 0 2
output: 1 1 1 3 3 3 4 5 5 5 5 5 4 3 3 2

在AVX或AVX架构上是否有任何有效的方法?也许log(16)= 4次迭代?

Are there any efficient way of doing this on AVX or AVX architecture? Maybe with log(16) = 4 iterations?

添加: 对于其中包含8个uint_16的128位数字的任何解决方案也将受到赞赏.

Addition: Any solution on 128 bit numbers with 8 uint_16's in it are appreciated also.

推荐答案

您确实可以在log_2(SIMD_width)步骤中执行此操作.想法是将输入向量x_vec移两个字节.然后我们混合 x_vec具有移位的矢量,使得x_vec被移位的矢量替换,但仅在x_vec的零位置. 以4、8和16字节的移位重复此过程.您可以在代码中取消对printf -s的注释,以查看x_vecx_trail之间的情况.

You can do this in log_2(SIMD_width) steps indeed. The idea is to shift the input vector x_vec two bytes. Then we blend x_vec with the shifted vector such that x_vec is replaced by the shifted vector, but only at the zero positions of x_vec. This process is repeated with shifts of 4, 8, and 16 bytes. You can uncomment the printf-s in the code to see what happens between x_vec and x_trail.

#include <stdio.h>
#include <x86intrin.h>
/*  gcc -O3 -Wall -m64 -march=broadwell -falign-loops=16 horz_trail_max.c   */
int print_vec_short(__m256i x);

__m256i hor_tr_max(__m256i x_vec){
    __m256i  zero         = _mm256_setzero_si256();
    __m256i  pshufb_cnst  = _mm256_set_epi64x(0x8080808080808080,0x8080808080808080,0x0F0E0F0E0F0E0F0E,0x0F0E0F0E0F0E0F0E);   

    __m256i  mask1        = _mm256_cmpeq_epi16(x_vec,zero);
    __m256i  t1           = _mm256_slli_si256(x_vec,2);                 /* _mm256_slli_si256() doesn't cross the 128b lanes          */
    __m256i  t2           = _mm256_blendv_epi8(x_vec,t1,mask1);

    __m256i  mask3        = _mm256_cmpeq_epi16(t2,zero);
    __m256i  t3           = _mm256_slli_si256(t2,4);
    __m256i  t4           = _mm256_blendv_epi8(t2,t3,mask3);

    __m256i  mask5        = _mm256_cmpeq_epi16(t4,zero);
    __m256i  t5           = _mm256_slli_si256(t4,8);
    __m256i  t6           = _mm256_blendv_epi8(t4,t5,mask5);

    __m256i  mask7        = _mm256_cmpeq_epi16(t6,zero);
    __m256i  t7_0         = _mm256_shuffle_epi8(t6,pshufb_cnst);        /* _mm256_slli_si256() doesn't cross the 128b boundaries. Therefore we need a shuffle and a permute here. */
    __m256i  t7_1         = _mm256_permute2x128_si256(t7_0,t7_0,0x01);  /* t7_1={t6[7], t6[7],...,t6[7],  0,0,0,0, 0,0,0,0}                                                      */
    __m256i  x_trail      = _mm256_blendv_epi8(t6,t7_1,mask7);

/* Uncomment the next few lines to print the values of the intermediate variables */ 
/*
    printf("\n15...0     =   15   14   13   12     11   10    9    8      7    6    5    4       3    2    1    0\n");
    printf("x_vec      = ");print_vec_short(x_vec      );printf("mask1      = ");print_vec_short(mask1      );
    printf("t1         = ");print_vec_short(t1         );printf("t2         = ");print_vec_short(t2         );
    printf("mask3      = ");print_vec_short(mask3      );printf("t3         = ");print_vec_short(t3         );
    printf("t4         = ");print_vec_short(t4         );printf("mask5      = ");print_vec_short(mask5      );
    printf("t5         = ");print_vec_short(t5         );printf("t6         = ");print_vec_short(t6         );
    printf("mask7      = ");print_vec_short(mask7      );printf("t7_0       = ");print_vec_short(t7_0       );
    printf("t7_1       = ");print_vec_short(t7_1       );printf("x_trail    = ");print_vec_short(x_trail    );printf("\n");
*/
    return x_trail;
}


int hor_tr_max_n(short int * x_in, short int * x_out, int n){
    __m256i  minus_1       = _mm256_set1_epi8(-1);
    __m256i  zero          = _mm256_setzero_si256();
    __m256i  pshufb_cnst   = _mm256_set_epi64x(0x8080808080808080,0x8080808080808080,0x0F0E0F0E0F0E0F0E,0x0F0E0F0E0F0E0F0E);   
    int      indx_last_nz  = 0;
    for (int i=0;i<n;i=i+16){
        __m256i  x_vec        = _mm256_load_si256((__m256i*)&x_in[i]);

        __m256i  mask1        = _mm256_cmpeq_epi16(x_vec,zero);
        __m256i  t1           = _mm256_slli_si256(x_vec,2);
        __m256i  t2           = _mm256_blendv_epi8(x_vec,t1,mask1);
        __m256i  mask3        = _mm256_cmpeq_epi16(t2,zero);
        __m256i  t3           = _mm256_slli_si256(t2,4);
        __m256i  t4           = _mm256_blendv_epi8(t2,t3,mask3);
        __m256i  mask5        = _mm256_cmpeq_epi16(t4,zero);
        __m256i  t5           = _mm256_slli_si256(t4,8);
        __m256i  t6           = _mm256_blendv_epi8(t4,t5,mask5);
        __m256i  mask7        = _mm256_cmpeq_epi16(t6,zero);
        __m256i  t7_0         = _mm256_shuffle_epi8(t6,pshufb_cnst);
        __m256i  t7_1         = _mm256_permute2x128_si256(t7_0,t7_0,0x01);
        __m256i  x_trail      = _mm256_blendv_epi8(t6,t7_1,mask7);

        __m256i  isnonzero    = _mm256_xor_si256(mask1,minus_1);                                                                                   
        int      mvmsk_nonz   = _mm256_movemask_epi8(isnonzero);                                                                                   
        int      lz_x_vec     = _lzcnt_u32( mvmsk_nonz ) >>1;                                                                                      
        __m256i  x_last_nz    = _mm256_broadcastw_epi16(_mm_load_si128((__m128i*)&x_in[indx_last_nz]));                                                               
                 indx_last_nz = mvmsk_nonz ? (i+15-lz_x_vec) : indx_last_nz;                                                                       

        __m256i  x_tr_is_zero = _mm256_cmpeq_epi16(x_trail,zero);
        __m256i  x_trail_upd  = _mm256_blendv_epi8(x_trail,x_last_nz,x_tr_is_zero);   

                                _mm256_store_si256((__m256i*)&x_out[i],x_trail_upd);
    }
    return 0;
}


int main() {
#define test 0

#if test == 0
    printf("Test 0: test functionality\n");
    short   x[16] = {1, 0, 0, 3, 0, 0, 4, 5, 0, 0, 0, 0, 4, 3, 0, 2};
//    short   x[16] = {0, 0, 0, 3, 0, 0, 4, 5, 0, 0, 0, 0, 4, 3, 0, 2};
//    short   x[16] = {1, 0, 0, 3, 0, 0, 4000, 0, 0, 0, 10, 0, 0, 3, 0, 2};
//    short   x[16] = {1100, 0, 0, 0, 0, 0, 0, 0,    0, 0, 0, 0, 5000, 0, 0, 0};
//    short   x[16] = {1100, 0, 0, 0, 0, 0, 0, 8888,    0, 0, 0, 0, 5000, 0, 0, 0};

    printf("\n15...0     =   15   14   13   12     11   10    9    8      7    6    5    4       3    2    1    0\n");
    __m256i  x_vec        = _mm256_loadu_si256((__m256i*)x);
    printf("x_vec      = ");print_vec_short(x_vec        );
    __m256i  x_trail      = hor_tr_max(x_vec);
    printf("x_trail    = ");print_vec_short(x_trail      );

#elif test == 1  || test == 2 
    int i, i_o, k;
    int n = 8000;
    int d = 50;
    short int *x_in;
    short int *x_out;
    x_in  = _mm_malloc(n*sizeof(short int),32);
    x_out = _mm_malloc(n*sizeof(short int),32);
    int j = 73659343;                            /* Generate some a pseudo random array a.                                               */
    for (i = 0;i < n;i++){
       j = j*653+1;                             
       k = (j & 0x3FF00)>>8;                     /* k is a pseudo random number between 0 and 1023                                       */
       if (k < d){                               /* with a small d, x_in has many zeros, try e.g. d=6, d=60 and d=600                     */         
          x_in[i] = (j&0xFFE)+1-2048;            /* Set x_in[i] to some nonzero.                                                              */
       }else{
          x_in[i] = 0;
       }
    }
#endif   

#if test == 1
    printf("Test 1: test performance for short int arrays of size n. Use:  perf stat -d ./a.out \n");
    for (i_o=0;i_o<400000;i_o++){                              /* The compiler should not interchange the inner and outer loop after function inlining, check compiler output (-S). */
        hor_tr_max_n(x_in,x_out,n);
    }        

#elif test == 2
    printf("Test 2: test performance of the unrolled scalar loop for short int arrays of size n. Use:  perf stat -d ./a.out\n");
    short int prev_x  = 0;
    for (i_o=0;i_o<400000;i_o++){                              /* The compiler should not interchange the inner and outer loop, check compiler output (-S). */
        for (i=0;i<n;i=i+4){
            short int x_in_i0 = x_in[i];                                                     
            short int x_in_i1 = x_in[i+1];                                                   
            short int x_in_i2 = x_in[i+2];                                                   
            short int x_in_i3 = x_in[i+3];                                                   
                      prev_x  = (x_in_i0)?(x_in_i0):(prev_x);  x_out[i]   = prev_x;           
                      prev_x  = (x_in_i1)?(x_in_i1):(prev_x);  x_out[i+1] = prev_x;           
                      prev_x  = (x_in_i2)?(x_in_i2):(prev_x);  x_out[i+2] = prev_x;           
                      prev_x  = (x_in_i3)?(x_in_i3):(prev_x);  x_out[i+3] = prev_x;           
        }
    }        

#elif test == 3
    printf("Test 3: Estimate approximately the latency and throughput of hor_tr_max with: perf stat -d ./a.out \n");
    int i;
    short   x0[16] = {1, 0, 0, 3, 0, 0, 4, 5, 0, 0, 0, 0, 4, 3, 0, 2};
    short   x1[16] = {0, 0, 0, 3, 0, 12, 4, 5, 0, 0, 0, 0, 4, 3, 0, 2};
    short   x2[16] = {1, 0, 0, 3, 0, 0, 4, 5, 0, 0, 10, 0, 4, 3, 0, 2};
    short   x3[16] = {110, 0, 0, 1113, 0, 0, 4, 5, 0, 0, 0, 0, 4000, 3, 0, 2};
    short   x4[16] = {110, 4, 0, 1113, 0, 0, 4, 5, 0, 7, 0, 0, 4000, 3, 0, 2};

    __m256i  x_vec0   = _mm256_loadu_si256((__m256i*)x0); printf("x_vec0 = ");print_vec_short(x_vec0); __m256i x_trail0 = hor_tr_max(x_vec0);                                  
    __m256i  x_vec1   = _mm256_loadu_si256((__m256i*)x1); printf("x_vec1 = ");print_vec_short(x_vec1); __m256i x_trail1 = hor_tr_max(x_vec1);                                
    __m256i  x_vec2   = _mm256_loadu_si256((__m256i*)x2); printf("x_vec2 = ");print_vec_short(x_vec2); __m256i x_trail2 = hor_tr_max(x_vec2);                                
    __m256i  x_vec3   = _mm256_loadu_si256((__m256i*)x3); printf("x_vec3 = ");print_vec_short(x_vec3); __m256i x_trail3 = hor_tr_max(x_vec3);                                  
    __m256i  x_vec4   = _mm256_loadu_si256((__m256i*)x4); printf("x_vec4 = ");print_vec_short(x_vec4); __m256i x_trail4 = hor_tr_max(x_vec4);                                  

    for(i=0;i<100000000;i++){
             x_trail0      = hor_tr_max(x_trail0);    /* Use this line for latency testing, uncomment next 4 lines for throughput testing */
//             x_trail1      = hor_tr_max(x_trail1);
//             x_trail2      = hor_tr_max(x_trail2);
//             x_trail3      = hor_tr_max(x_trail3);
//             x_trail4      = hor_tr_max(x_trail4);
             }
    printf("x_trail0 = ");print_vec_short(x_trail0 );
    printf("x_trail1 = ");print_vec_short(x_trail1 );
    printf("x_trail2 = ");print_vec_short(x_trail2 );
    printf("x_trail3 = ");print_vec_short(x_trail3 );
    printf("x_trail4 = ");print_vec_short(x_trail4 );
#endif   

#if test == 1  || test == 2  
    for (i=0;i<400;i++){
        printf("%6i %6hi  %6hi\n",i,x_in[i],x_out[i]);
    }
#endif   

    return 0;
}

int print_vec_short(__m256i x){
    short int v[16];
    _mm256_storeu_si256((__m256i *)v,x);
    printf("%4hi %4hi %4hi %4hi | %4hi %4hi %4hi %4hi | %4hi %4hi %4hi %4hi  | %4hi %4hi %4hi %4hi\n",
           v[15],v[14],v[13],v[12],v[11],v[10],v[9],v[8],v[7],v[6],v[5],v[4],v[3],v[2],v[1],v[0]);
    return 0;
}

输出为:

15...0     =   15   14   13   12     11   10    9    8      7    6    5    4       3    2    1    0
x_vec      =    2    0    3    4 |    0    0    0    0 |    5    4    0    0  |    3    0    0    1
x_trail    =    2    3    3    4 |    5    5    5    5 |    5    4    3    3  |    3    1    1    1


此功能hor_tr_max的延迟和吞吐量约为14.2和6.4个周期(英特尔Skylake Core i5-6500). 请注意,标准的稍微展开的标量循环例如:

This function hor_tr_max has a latency and throughput of approximately 14.2 and 6.4 cycles (Intel Skylake Core i5-6500). Note that a standard slightly unrolled scalar loop such as:

short int prev_x  = 0;
for (i=0;i<n;i=i+4){
    short int x_in_i0 = x_in[i];                                                     
    short int x_in_i1 = x_in[i+1];                                                   
    short int x_in_i2 = x_in[i+2];                                                   
    short int x_in_i3 = x_in[i+3];                                                   
              prev_x  = (x_in_i0)?(x_in_i0):(prev_x);  x_out[i]   = prev_x;           
              prev_x  = (x_in_i1)?(x_in_i1):(prev_x);  x_out[i+1] = prev_x;           
              prev_x  = (x_in_i2)?(x_in_i2):(prev_x);  x_out[i+2] = prev_x;           
              prev_x  = (x_in_i3)?(x_in_i3):(prev_x);  x_out[i+3] = prev_x;           
}

每个short int

大约需要1.26个周期,即每16个short int -s需要20.2个周期.因此,向量化是 在这里有利可图.

takes about 1.26 cycle per short int, which is 20.2 cycles per 16 short int-s. So, vectorization is profitable here.


大小为n的数组的水平尾随最大值

我们还可以使用hor_tr_max来计算大小为n的数组的水平尾随最大值,其中n远大于16. 但是,需要步骤i的输出来计算下一步.此循环携带的依赖项导致代码性能低下. 在上面的代码中,函数hor_tr_max_n实现的方法略有不同,从而使依赖关系链更短,这是有益的,因为 乱序调度.

We can use hor_tr_max also to compute the horizontal trailing maximum of an array of size n, with n much larger than 16. However, the output of step i is needed to compute the next step. This loop carried dependence results in a low performance of the code. Function hor_tr_max_n, in the code above, implements a slightly different method that makes the dependency chain shorter, which is beneficial due to out of order scheduling.

功能hor_tr_max_n每16个short int的成本为12.2个周期,这比展开的成本低40% 标量循环.

Function hor_tr_max_n costs 12.2 cycles per 16 short ints, which is about 40 percent less than the unrolled scalar loop.

使用即将推出的Skylake-SP处理器,水平尾随最大值"的矢量化可能会 由于矢量寄存器更宽,因此利润更高.

It is likely that with the forthcoming Skylake-SP processor, vectorization of the 'horizontal trailing maximum' will be even more profitable, due to wider vector registers.

这篇关于AVX或SSE上的水平尾随最大值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆