如何优化间接基数排序? (也就是如何优化不可预测的内存访问模式) [英] How to optimize an indirect radix sort? (a.k.a. how to optimize unpredictable memory access patterns)

查看:425
本文介绍了如何优化间接基数排序? (也就是如何优化不可预测的内存访问模式)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在C ++中编写了一个间接基数排序算法(通过间接,我的意思是它返回项目的索引):

  #include< algorithm> 
#include< iterator>
#include< vector>

template< class It1,class It2>
void radix_ipass(
It1 begin,It1 const end,
It2 const a,size_t const i,
std :: vector< std :: vector< size_t>& buckets)
{
size_t ncleared = 0;
for(It1 j = begin; j!= end; ++ j)
{
size_t const k = a [* j] [i]
while(k> = ncleared& ncleared< buckets.size())
{buckets [ncleared ++]。 }
if(k> = buckets.size())
{
buckets.resize(k + 1);
ncleared = buckets.size();
}
buckets [k] .push_back(size_t());
using std :: swap; swap(buckets [k] .back(),* j);
}
for(std :: vector< std :: vector< size_t>> :: iterator
j = buckets.begin(); j!= buckets.begin()+ ncleared; j-> clear(),++ j)
{
begin = std :: swap_ranges(j-> begin(),j-> end(),begin);
}
}

template< class It,class It2>
void radix_isort(it const begin,It const end,It2 const items)
{
for(ptrdiff_t i = 0; i!= end - begin; ++ i){items [i ] = i; }
size_t smax = 0;
for(It i = begin; i!= end; ++ i)
{
size_t const n = i-> size();
smax = n> smax? n:smax;
}
std :: vector< std :: vector< size_t> >桶;
for(size_t i = 0; i!= smax; ++ i)
{
radix_ipass(
items,items +(end - begin),
,smax-i-1,bucket);
}
}

似乎执行速度比 std :: sort 当我用下面的代码测试它(3920毫秒相比6530毫秒):

  #include< functional> 

template< class Key>
struct key_comp:public Key
{
explicit key_comp(Key const& key = Key()):Key(key){}
template< class T&
bool operator()(T const& a,T const& b)const
{return this-> Key :: operator this-> Key :: operator()(b); }
};

template< class Key>
key_comp< Key> make_key_comp(Key const& key){return key_comp< Key>(key); }

template< class T1,class T2>
struct add:public std :: binary_function< T1,T2,T1>
{T1 operator()(T1 a,T2 const& b)const {return a + = b; }};

template< class F>
struct deref:public F
{
deref(F const& f):F(f){}
typename std :: iterator_traits<
typename F :: result_type
> :: value_type const
& operator()(typename F :: argument_type const& a)const
{return * this-> ; F :: operator()(a); }
};

template< class T> deref< T> make_deref(T const& t){return deref T(t); }

size_t xorshf96(void)//随机数生成器
{
static size_t x = 123456789,y = 362436069,z = 521288629;
x ^ = x<< 16;
x ^ = x>> 5;
x ^ = x<< 1;
size_t t = x;
x = y;
y = z
z = t ^ x ^ y;
return z;
}

#include< stdio.h>
#include< time.h>

#include< array>

int main(void)
{
typedef std :: vector< std :: array< size_t,3& >项目;
项目项目(1<< 24);
std :: vector< size_t> rankks(items.size()* 2);
for(size_t i = 0; i!= items.size(); i ++)
{
ranks [i] = i;
for(size_t j = 0; j!= items [i] .size(); j ++)
{items [i] [j] = xorshf96()& 0xFFF; }
}
clock_t const start = clock();
if(1){radix_isort(items.begin(),items.end(),ranks.begin()); }
else // STL排序
{
std :: sort(
ranks.begin(),
ranks.begin()+ items.size
make_key_comp(make_deref(std :: bind1st(
add< Items :: const_iterator,ptrdiff_t>(),
items.begin()))));
}
printf(%u ms\\\

(unsigned)((clock() - start)* 1000 / CLOCKS_PER_SEC),
std :: min ranks.begin(),ranks.end()));
return 0;
}

嗯,我想这是我能做的最好的, p>

但是经过大量的碰撞墙后,我意识到在 radix_ipass 开头预取可以帮助减少结果为 1440 ms (!):

  #include< xmmintrin.h& 

...

for(It1 j = begin; j!= end; ++ j)
{
#if defined(_MM_TRANSPOSE4_PS) //如果包含xmmintrin.h,应该定义
enum {N = 8};
if(end-j> N)
{_mm_prefetch((char const *)(& a [j [N]] [i]),_MM_HINT_T0); }
#endif
...
}

瓶颈是内存带宽---访问模式是不可预测的。



现在我的问题是:我还能做些什么,数据?



还是没有多少空间可以改进?



(我希望避免妥协)

解决方案

如果可能的话,使用更多的可读性组合排名和值的紧凑数据结构可以将因子2-3提高 std :: sort 的性能。基本上,排序现在在向量< pair< Value,Rank>> 上运行。 Value 数据类型, std :: array< integer_type,3> code> pair< uint32_t,uint8_t> 的数据结构。只有它的一半字节未被使用,并且< 比较可以通过两个步骤完成,首先使用 uint32_t s(不清楚是否可以将 std :: array< ..> :: operator< 使用的循环优化为类似的快速代码,但是通过该数据结构来替换 std :: array< integer_type,3> 会提高另一个性能)。



仍然,它没有得到与基数排序一样高效。



除了额外的排序方法,我已经替换了 xorshf96

可以通过两个命令行参数更改值的数量:首先是种子,然后是计数。



使用g ++ 4.9.0 20131022编译,使用 -std = c ++ 11 -march = native -O3 ,对于64位linux



重要笔记在Core2Duo处理器U9400(旧&慢!)上运行

 
项目数:16000000
使用std :: sort
持续时间:12260 ms
结果排序:true

种子:5648
项目数:16000000
使用std :: sort
duration:12230 ms
result sorted:true

种子:5648
项目数:16000000
使用std :: sort
duration :12230 ms
结果排序:true


种子:5648
项目数:16000000
使用带有打包数据结构的std :: sort
duration:4290 ms
结果排序:true

种子:5648
项目数:16000000
使用带有打包数据结构的std :: sort
持续时间:4270 ms
结果排序:true

种子:5648
项目数:16000000
使用带有打包数据结构的std :: sort
持续时间:4280 ms
结果排序:真


项目数:16000000
使用基数排序
持续时间:3790 ms
结果排序: true

种子:5648
项目数:16000000
使用基数排序
持续时间:3820 ms
结果排序:true
$ b b种子:5648
项目数:16000000
使用基数排序
持续时间:3780 ms
结果排序:true

新的或更改的代码:

  template< class It> 
struct fun_obj
{
It beg;
bool operator()(ptrdiff_t lhs,ptrdiff_t rhs)
{
return beg [lhs]< ;
}
};

template< class It>
fun_obj< it> make_fun_obj(it beg)
{
return fun_obj< it> {beg};
}

struct uint32p8_t
{
uint32_t m32;
uint8_t m8;

uint32p8_t(std :: array< uint16_t,3> const& a)
:m32(a [0]<<(32-3 * 4)| a [1] <<(32-2 * 3 * 4)|(a [2]& 0xF00)> 8)
,m8(a [2]& 0xFF)
{
}

operator std :: array< size_t,3>()const
{
return {{m32& 0xFFF00000> (32-3 * 4),m32& 0x000FFF0>> (32-2 * 3 * 4),
(m32& 0xF)<< 8 | m8}};
}

friend bool operator<(uint32p8_t const& lhs,uint32p8_t const& rhs)
{
if(lhs.m32 if(lhs.m32> rhs.m32)return false;
return lhs.m8< rhs.m8;
}
};

#include< stdio.h>
#include< time.h>

#include< array>
#include< iostream>
#include< iomanip>
#include< utility>
#include< algorithm>
#include< cstdlib>
#include< iomanip>
#include< random>

int main(int argc,char * argv [])
{
std :: cout.sync_with_stdio(false);

constexpr auto items_count_default = 2<< 22;
constexpr auto seed_default = 42;

uint32_t const seed = argc> 1? std :: atoll(argv [1]):seed_default;
std :: cout<< seed:<<种子< \\\
;

size_t const items_count = argc> 2。 std :: atoll(argv [2])
:items_count_default;
std :: cout<< item count:< items_count<< \\\
;

使用Items_array_value_t =
#ifdef RADIX_SORT
size_t
#elif defined(STDSORT)
uint16_t
#elif defined(STDSORT_PACKED)
uint16_t
#endif
;

typedef std :: vector< std :: array< Items_array_value_t,3> >项目;
项目项目(items_count);

auto const ranks_count =
#ifdef RADIX_SORT
items.size()* 2
#elif defined(STDSORT)
items.size()
#elif defined(STDSORT_PACKED)
items.size()
#endif
;

// auto prng = xorshf96;
std :: mt19937 gen(seed);
std :: uniform_int_distribution<> dist;
auto prng = [& dist,& gen] {return dist(gen);};

std :: vector< size_t> rank(ranks_count);
for(size_t i = 0; i!= items.size(); i ++)
{
ranks [i] = i;

for(size_t j = 0; j!= items [i] .size(); j ++)
{items [i] [j] = prng()& 0xFFF; }
}

std :: cout<< using;
clock_t const start = clock();
#ifdef RADIX_SORT
std :: cout<< radix sort \\\
;
radix_isort(items.begin(),things.end(),ranks.begin());
#elif defined(STDSORT)
std :: cout<< std :: sort\\\
;
std :: sort(ranks.begin(),ranks.begin()+ items.size(),
make_fun_obj(items.cbegin())
// make_key_comp :: bind1st(
// add< Items :: const_iterator,ptrdiff_t>(),
// items.begin())))
);
#elif defined(STDSORT_PACKED)
std :: cout<< std :: sort with a packed data structure\\\
;
使用Items_ranks = std :: vector< std :: pair< uint32p8_t,
decltype(ranks):: value_type> > ;;
Items_ranks items_ranks;

size_t i = 0;
for(auto iI = items.cbegin(); iI!= items.cend(); ++ iI,++ i)
{
items_ranks.emplace_back(* iI,i) ;
}

std :: sort(begin(items_ranks),end(items_ranks),
[](Items_ranks :: value_type const& lhs,
Items_ranks :: value_type const& rhs)
{return lhs.first< rhs.first;}
);
std :: transform(items_ranks.cbegin(),items_ranks.cend(),begin(ranks),
[](Items_ranks :: value_type const& e){return e.second;}
);
#endif
auto const duration =(clock() - start)/(CLOCKS_PER_SEC / 1000);

bool const sorted = std :: is_sorted(ranks.begin(),ranks.begin()+ items.size(),
make_fun_obj(items.cbegin()));

std :: cout<< duration:<持续时间< ms\\\

<< result sorted:< std :: boolalpha< sort<< \\\
;

return 0;
}

完整代码:

  #include< algorithm> 
#include< iterator>
#include< vector>

#include< cstddef>
using std :: size_t;
using std :: ptrdiff_t;

#include< xmmintrin.h>

template< class It1,class It2>
void radix_ipass(
It1 begin,It1 const end,
It2 const a,size_t const i,
std :: vector< std :: vector< size_t>& buckets)
{
size_t ncleared = 0;
for(It1 j = begin; j!= end; ++ j)
{
#if defined(_MM_TRANSPOSE4_PS)
constexpr auto N = 8;
if(end-j> N)
{_mm_prefetch((char const *)(& a [j [N]] [i]),_MM_HINT_T0); }
#else
#error SS intrinsic not found
#endif

size_t const k = a [* j] [i];
while(k> = ncleared& ncleared< buckets.size())
{buckets [ncleared ++]。 }
if(k> = buckets.size())
{
buckets.resize(k + 1);
ncleared = buckets.size();
}
buckets [k] .push_back(size_t());
using std :: swap; swap(buckets [k] .back(),* j);
}
for(std :: vector< std :: vector< size_t>> :: iterator
j = buckets.begin(); j!= buckets.begin()+ ncleared; j-> clear(),++ j)
{
begin = std :: swap_ranges(j-> begin(),j-> end(),begin);
}
}

template< class It,class It2>
void radix_isort(it const begin,It const end,It2 const items)
{
for(ptrdiff_t i = 0; i!= end - begin; ++ i){items [i ] = i; }
size_t smax = 0;
for(It i = begin; i!= end; ++ i)
{
size_t const n = i-> size();
smax = n> smax? n:smax;
}
std :: vector for(size_t i = 0; i!= smax; ++ i)
{
radix_ipass(
items,items +(end - begin),
,smax-i-1,bucket);
}
}

#include< functional>

template< class Key>
struct key_comp:public Key
{
explicit key_comp(Key const& key = Key()):Key(key){}
template< class T&
bool operator()(T const& a,T const& b)const
{return this-> Key :: operator this-> Key :: operator()(b); }
};

template< class Key>
key_comp< Key> make_key_comp(Key const& key){return key_comp< Key>(key); }

template< class T1,class T2>
struct add:public std :: binary_function< T1,T2,T1>
{T1 operator()(T1 a,T2 const& b)const {return a + = b; }};

template< class F>
struct deref:public F
{
deref(F const& f):F(f){}
typename std :: iterator_traits<
typename F :: result_type
> :: value_type const
& operator()(typename F :: argument_type const& a)const
{return * this-> ; F :: operator()(a); }
};

template< class T> deref< T> make_deref(T const& t){return deref T(t); }

size_t xorshf96(void)//随机数生成器
{
static size_t x = 123456789,y = 362436069,z = 521288629;
x ^ = x<< 16;
x ^ = x>> 5;
x ^ = x<< 1;
size_t t = x;
x = y;
y = z;
z = t ^ x ^ y;
return z;
}

template< class It>
struct fun_obj
{
It beg;
bool operator()(ptrdiff_t lhs,ptrdiff_t rhs)
{
return beg [lhs]< ;
}
};

template< class It>
fun_obj< it> make_fun_obj(it beg)
{
return fun_obj< it> {beg};
}

struct uint32p8_t
{
uint32_t m32;
uint8_t m8;

uint32p8_t(std :: array< uint16_t,3> const& a)
:m32(a [0]<<(32-3 * 4)| a [1] <<(32-2 * 3 * 4)|(a [2]& 0xF00)> 8)
,m8(a [2]& 0xFF)
{
}

operator std :: array< size_t,3>()const
{
return {{m32& 0xFFF00000> (32-3 * 4),m32& 0x000FFF0>> (32-2 * 3 * 4),
(m32& 0xF)<< 8 | m8}};
}

friend bool operator<(uint32p8_t const& lhs,uint32p8_t const& rhs)
{
if(lhs.m32 if(lhs.m32> rhs.m32)return false;
return lhs.m8< rhs.m8;
}
};

#include< stdio.h>
#include< time.h>

#include< array>
#include< iostream>
#include< iomanip>
#include< utility>
#include< algorithm>
#include< cstdlib>
#include< iomanip>
#include< random>

int main(int argc,char * argv [])
{
std :: cout.sync_with_stdio(false);

constexpr auto items_count_default = 2<< 22;
constexpr auto seed_default = 42;

uint32_t const seed = argc> 1? std :: atoll(argv [1]):seed_default;
std :: cout<< seed:<<种子< \\\
;
size_t const items_count = argc> 2。 std :: atoll(argv [2]):items_count_default;
std :: cout<< item count:<< items_count<< \\\
;

使用Items_array_value_t =
#ifdef RADIX_SORT
size_t
#elif defined(STDSORT)
uint16_t
#elif defined(STDSORT_PACKED)
uint16_t
#endif
;

typedef std :: vector< std :: array< Items_array_value_t,3> >项目;
项目项目(items_count);

auto const ranks_count =
#ifdef RADIX_SORT
items.size()* 2
#elif defined(STDSORT)
items.size()
#elif defined(STDSORT_PACKED)
items.size()
#endif
;

// auto prng = xorshf96;
std :: mt19937 gen(seed);
std :: uniform_int_distribution<> dist;
auto prng = [& dist,& gen] {return dist(gen);};

std :: vector< size_t> rank(ranks_count);
for(size_t i = 0; i!= items.size(); i ++)
{
ranks [i] = i;

for(size_t j = 0; j!= items [i] .size(); j ++)
{items [i] [j] = prng()& 0xFFF; }
}

std :: cout<< using;
clock_t const start = clock();
#ifdef RADIX_SORT
std :: cout<< radix sort \\\
;
radix_isort(items.begin(),items.end(),ranks.begin());
#elif defined(STDSORT)
std :: cout<< std :: sort\\\
;
std :: sort(ranks.begin(),ranks.begin()+ items.size(),
make_fun_obj(items.cbegin())
// make_key_comp :: bind1st(
// add< Items :: const_iterator,ptrdiff_t>(),
// items.begin())))
);
#elif defined(STDSORT_PACKED)
std :: cout<< std :: sort with a packed data structure\\\
;
使用Items_ranks = std :: vector< std :: pair< uint32p8_t,
decltype(ranks):: value_type> > ;;
Items_ranks items_ranks;

size_t i = 0;
for(auto iI = items.cbegin(); iI!= items.cend(); ++ iI,++ i)
{
items_ranks.emplace_back(* iI,i) ;
}

std :: sort(begin(items_ranks),end(items_ranks),
[](Items_ranks :: value_type const& lhs,
Items_ranks :: value_type const& rhs)
{return lhs.first< rhs.first;}
);
std :: transform(items_ranks.cbegin(),items_ranks.cend(),begin(ranks),
[](Items_ranks :: value_type const& e){return e.second;}
);
#endif
auto const duration =(clock() - start)/(CLOCKS_PER_SEC / 1000);

bool const sorted = std :: is_sorted(ranks.begin(),ranks.begin()+ items.size(),
make_fun_obj(items.cbegin()));

std :: cout<< duration:<持续时间< ms\\\

<< result sorted:< std :: boolalpha< sort<< \\\
;

return 0;
}


I've written an indirect radix sort algorithm in C++ (by indirect, I mean it returns the indices of the items):

#include <algorithm>
#include <iterator>
#include <vector>

template<class It1, class It2>
void radix_ipass(
    It1 begin, It1 const end,
    It2 const a, size_t const i,
    std::vector<std::vector<size_t> > &buckets)
{
    size_t ncleared = 0;
    for (It1 j = begin; j != end; ++j)
    {
        size_t const k = a[*j][i];
        while (k >= ncleared && ncleared < buckets.size())
        { buckets[ncleared++].clear(); }
        if (k >= buckets.size())
        {
            buckets.resize(k + 1);
            ncleared = buckets.size();
        }
        buckets[k].push_back(size_t());
        using std::swap; swap(buckets[k].back(), *j);
    }
    for (std::vector<std::vector<size_t> >::iterator
        j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
    {
        begin = std::swap_ranges(j->begin(), j->end(), begin);
    }
}

template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
{
    for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; }
    size_t smax = 0;
    for (It i = begin; i != end; ++i)
    {
        size_t const n = i->size();
        smax = n > smax ? n : smax;
    }
    std::vector<std::vector<size_t> > buckets;
    for (size_t i = 0; i != smax; ++i)
    {
        radix_ipass(
            items, items + (end - begin),
            begin, smax - i - 1, buckets);
    }
}

It seems to perform around 40% faster than std::sort when I test it with the following code (3920 ms compared to 6530 ms):

#include <functional>

template<class Key>
struct key_comp : public Key
{
    explicit key_comp(Key const &key = Key()) : Key(key) { }
    template<class T>
    bool operator()(T const &a, T const &b) const
    { return this->Key::operator()(a) < this->Key::operator()(b); }
};

template<class Key>
key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); }

template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
{ T1 operator()(T1 a, T2 const &b) const { return a += b; } };

template<class F>
struct deref : public F
{
    deref(F const &f) : F(f) { }
    typename std::iterator_traits<
        typename F::result_type
    >::value_type const
        &operator()(typename F::argument_type const &a) const
    { return *this->F::operator()(a); }
};

template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); }

size_t xorshf96(void)  // random number generator
{
    static size_t x = 123456789, y = 362436069, z = 521288629;
    x ^= x << 16;
    x ^= x >> 5;
    x ^= x << 1;
    size_t t = x;
    x = y;
    y = z;
    z = t ^ x ^ y;
    return z;
}

#include <stdio.h>
#include <time.h>

#include <array>

int main(void)
{
    typedef std::vector<std::array<size_t, 3> > Items;
    Items items(1 << 24);
    std::vector<size_t> ranks(items.size() * 2);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;
        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = xorshf96() & 0xFFF; }
    }
    clock_t const start = clock();
    if (1) { radix_isort(items.begin(), items.end(), ranks.begin()); }
    else  // STL sorting
    {
        std::sort(
            ranks.begin(),
            ranks.begin() + items.size(),
            make_key_comp(make_deref(std::bind1st(
                add<Items::const_iterator, ptrdiff_t>(),
                items.begin()))));
    }
    printf("%u ms\n",
        (unsigned)((clock() - start) * 1000 / CLOCKS_PER_SEC),
        std::min(ranks.begin(), ranks.end()));
    return 0;
}

Hmm, I guess that's the best I can do, I thought.

But after lots of banging my head against the wall, I realized that prefetching in the beginning of radix_ipass can help cut down the result to 1440 ms (!):

#include <xmmintrin.h>

...

    for (It1 j = begin; j != end; ++j)
    {
#if defined(_MM_TRANSPOSE4_PS)  // should be defined if xmmintrin.h is included
        enum { N = 8 };
        if (end - j > N)
        { _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); }
#endif
        ...
    }

Clearly, the bottleneck is the memory bandwidth---the access pattern is unpredictable.

So now my question is: what else can I do to make it even faster on similar amounts of data?

Or is there not much room left for improvement?

(I'm hoping to avoid compromising the readability of the code if possible, so if the readability is harmed, the improvement should be significant.)

解决方案

Using a more compact data structure that combines ranks and values can boost the performance of std::sort by a factor 2-3. Essentially, the sort now runs on a vector<pair<Value,Rank>>. The Value data type, std::array<integer_type, 3> has been replaced for this by a more compact pair<uint32_t, uint8_t> data structure. Only half a byte of it is unused, and the < comparison can by done in two steps, first using a presumably efficient comparison of uint32_ts (it's not clear if the loop used by std::array<..>::operator< can be optimized to a similarly fast code, but the replacement of std::array<integer_type,3> by this data structure yielded another performance boost).

Still, it doesn't get as efficient as the radix sort. (Maybe you could tweak a custom QuickSort with prefetches?)

Besides that additional sorting method, I've replaced the xorshf96 by a mt19937, because I know how to provide a seed for the latter ;)

The seed and the number of values can be changed via two command-line arguments: first the seed, then the count.

Compiled with g++ 4.9.0 20131022, using -std=c++11 -march=native -O3, for a 64-bit linux

Sample runs; important note running on a Core2Duo processor U9400 (old & slow!)

item count: 16000000
using std::sort
duration: 12260 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort
duration: 12230 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort
duration: 12230 ms
result sorted: true


seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4290 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4270 ms
result sorted: true

seed: 5648
item count: 16000000
using std::sort with a packed data structure
duration: 4280 ms
result sorted: true


item count: 16000000
using radix sort
duration: 3790 ms
result sorted: true

seed: 5648
item count: 16000000
using radix sort
duration: 3820 ms
result sorted: true

seed: 5648
item count: 16000000
using radix sort
duration: 3780 ms
result sorted: true

New or changed code:

template<class It>
struct fun_obj
{
        It beg;
        bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
        {
                return beg[lhs] < beg[rhs];
        }
};

template<class It>
fun_obj<It> make_fun_obj(It beg)
{
        return fun_obj<It>{beg};
}

struct uint32p8_t
{
        uint32_t m32;
        uint8_t m8;

        uint32p8_t(std::array<uint16_t, 3> const& a)
                : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
                , m8( a[2]&0xFF )
        {
        }

        operator std::array<size_t, 3>() const
        {
                return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
                         (m32&0xF)<<8 | m8}};
        }

        friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
        {
                if(lhs.m32 < rhs.m32) return true;
                if(lhs.m32 > rhs.m32) return false;
                return lhs.m8 < rhs.m8;
        }
};

#include <stdio.h>
#include <time.h>

#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>

int main(int argc, char* argv[])
{
    std::cout.sync_with_stdio(false);

    constexpr auto items_count_default = 2<<22;
    constexpr auto seed_default = 42;

    uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
    std::cout << "seed: " << seed << "\n";

    size_t const items_count = argc > 2 ? std::atoll(argv[2])
                                        : items_count_default;
    std::cout << "item count: " << items_count << "\n";

    using Items_array_value_t =
        #ifdef RADIX_SORT
            size_t
        #elif defined(STDSORT)
            uint16_t
        #elif defined(STDSORT_PACKED)
            uint16_t
        #endif
    ;

    typedef std::vector<std::array<Items_array_value_t, 3> > Items;
    Items items(items_count);

    auto const ranks_count =
        #ifdef RADIX_SORT
            items.size() * 2
        #elif defined(STDSORT)
            items.size()
        #elif defined(STDSORT_PACKED)
            items.size()
    #endif
    ;

    //auto prng = xorshf96;
    std::mt19937 gen(seed);
    std::uniform_int_distribution<> dist;
    auto prng = [&dist, &gen]{return dist(gen);};

    std::vector<size_t> ranks(ranks_count);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;

        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = prng() & 0xFFF; }
    }

    std::cout << "using ";
    clock_t const start = clock();
    #ifdef RADIX_SORT
        std::cout << "radix sort\n";
        radix_isort(items.begin(), items.end(), ranks.begin());
    #elif defined(STDSORT)
        std::cout << "std::sort\n";
        std::sort(ranks.begin(), ranks.begin() + items.size(),
                  make_fun_obj(items.cbegin())
                  //make_key_comp(make_deref(std::bind1st(
                  //    add<Items::const_iterator, ptrdiff_t>(),
                  //    items.begin())))
                 );
    #elif defined(STDSORT_PACKED)
        std::cout << "std::sort with a packed data structure\n";
        using Items_ranks = std::vector< std::pair<uint32p8_t,
                                         decltype(ranks)::value_type> >;
        Items_ranks items_ranks;

        size_t i = 0;
        for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
        {
            items_ranks.emplace_back(*iI, i);
        }

        std::sort(begin(items_ranks), end(items_ranks),
                  [](Items_ranks::value_type const& lhs,
                     Items_ranks::value_type const& rhs)
                  { return lhs.first < rhs.first; }
                 );
        std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
                       [](Items_ranks::value_type const& e) { return e.second; }
                      );
    #endif
    auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);

    bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
                                       make_fun_obj(items.cbegin()));

    std::cout << "duration: " << duration << " ms\n"
              << "result sorted: " << std::boolalpha << sorted << "\n";

    return 0;
}

Full code:

#include <algorithm>
#include <iterator>
#include <vector>

#include <cstddef>
using std::size_t;
using std::ptrdiff_t;

#include <xmmintrin.h>

template<class It1, class It2>
void radix_ipass(
    It1 begin, It1 const end,
    It2 const a, size_t const i,
    std::vector<std::vector<size_t> > &buckets)
{
    size_t ncleared = 0;
    for (It1 j = begin; j != end; ++j)
    {
        #if defined(_MM_TRANSPOSE4_PS)
            constexpr auto N = 8;
            if(end - j > N)
            { _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); }
        #else
            #error SS intrinsic not found
        #endif

        size_t const k = a[*j][i];
        while (k >= ncleared && ncleared < buckets.size())
        { buckets[ncleared++].clear(); }
        if (k >= buckets.size())
        {
            buckets.resize(k + 1);
            ncleared = buckets.size();
        }
        buckets[k].push_back(size_t());
        using std::swap; swap(buckets[k].back(), *j);
    }
    for (std::vector<std::vector<size_t> >::iterator
        j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j)
    {
        begin = std::swap_ranges(j->begin(), j->end(), begin);
    }
}

template<class It, class It2>
void radix_isort(It const begin, It const end, It2 const items)
{
    for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; }
    size_t smax = 0;
    for (It i = begin; i != end; ++i)
    {
        size_t const n = i->size();
        smax = n > smax ? n : smax;
    }
    std::vector<std::vector<size_t> > buckets;
    for (size_t i = 0; i != smax; ++i)
    {
        radix_ipass(
            items, items + (end - begin),
            begin, smax - i - 1, buckets);
    }
}

#include <functional>

template<class Key>
struct key_comp : public Key
{
    explicit key_comp(Key const &key = Key()) : Key(key) { }
    template<class T>
    bool operator()(T const &a, T const &b) const
    { return this->Key::operator()(a) < this->Key::operator()(b); }
};

template<class Key>
key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); }

template<class T1, class T2>
struct add : public std::binary_function<T1, T2, T1>
{ T1 operator()(T1 a, T2 const &b) const { return a += b; } };

template<class F>
struct deref : public F
{
    deref(F const &f) : F(f) { }
    typename std::iterator_traits<
        typename F::result_type
    >::value_type const
        &operator()(typename F::argument_type const &a) const
    { return *this->F::operator()(a); }
};

template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); }

size_t xorshf96(void)  // random number generator
{
    static size_t x = 123456789, y = 362436069, z = 521288629;
    x ^= x << 16;
    x ^= x >> 5;
    x ^= x << 1;
    size_t t = x;
    x = y;
    y = z;
    z = t ^ x ^ y;
    return z;
}

template<class It>
struct fun_obj
{
        It beg;
        bool operator()(ptrdiff_t lhs, ptrdiff_t rhs)
        {
                return beg[lhs] < beg[rhs];
        }
};

template<class It>
fun_obj<It> make_fun_obj(It beg)
{
        return fun_obj<It>{beg};
}

struct uint32p8_t
{
        uint32_t m32;
        uint8_t m8;

        uint32p8_t(std::array<uint16_t, 3> const& a)
                : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8)
                , m8( a[2]&0xFF )
        {
        }

        operator std::array<size_t, 3>() const
        {
                return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4),
                         (m32&0xF)<<8 | m8}};
        }

        friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs)
        {
                if(lhs.m32 < rhs.m32) return true;
                if(lhs.m32 > rhs.m32) return false;
                return lhs.m8 < rhs.m8;
        }
};

#include <stdio.h>
#include <time.h>

#include <array>
#include <iostream>
#include <iomanip>
#include <utility>
#include <algorithm>
#include <cstdlib>
#include <iomanip>
#include <random>

int main(int argc, char* argv[])
{
    std::cout.sync_with_stdio(false);

    constexpr auto items_count_default = 2<<22;
    constexpr auto seed_default = 42;

    uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default;
    std::cout << "seed: " << seed << "\n";
    size_t const items_count = argc > 2 ? std::atoll(argv[2]) : items_count_default;
    std::cout << "item count: " << items_count << "\n";

    using Items_array_value_t =
        #ifdef RADIX_SORT
            size_t
        #elif defined(STDSORT)
            uint16_t
        #elif defined(STDSORT_PACKED)
            uint16_t
        #endif
    ;

    typedef std::vector<std::array<Items_array_value_t, 3> > Items;
    Items items(items_count);

    auto const ranks_count =
        #ifdef RADIX_SORT
            items.size() * 2
        #elif defined(STDSORT)
            items.size()
        #elif defined(STDSORT_PACKED)
            items.size()
    #endif
    ;

    //auto prng = xorshf96;
    std::mt19937 gen(seed);
    std::uniform_int_distribution<> dist;
    auto prng = [&dist, &gen]{return dist(gen);};

    std::vector<size_t> ranks(ranks_count);
    for (size_t i = 0; i != items.size(); i++)
    {
        ranks[i] = i;

        for (size_t j = 0; j != items[i].size(); j++)
        { items[i][j] = prng() & 0xFFF; }
    }

    std::cout << "using ";
    clock_t const start = clock();
    #ifdef RADIX_SORT
        std::cout << "radix sort\n";
        radix_isort(items.begin(), items.end(), ranks.begin());
    #elif defined(STDSORT)
        std::cout << "std::sort\n";
        std::sort(ranks.begin(), ranks.begin() + items.size(),
                  make_fun_obj(items.cbegin())
                  //make_key_comp(make_deref(std::bind1st(
                  //    add<Items::const_iterator, ptrdiff_t>(),
                  //    items.begin())))
                 );
    #elif defined(STDSORT_PACKED)
        std::cout << "std::sort with a packed data structure\n";
        using Items_ranks = std::vector< std::pair<uint32p8_t,
                                         decltype(ranks)::value_type> >;
        Items_ranks items_ranks;

        size_t i = 0;
        for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i)
        {
            items_ranks.emplace_back(*iI, i);
        }

        std::sort(begin(items_ranks), end(items_ranks),
                  [](Items_ranks::value_type const& lhs,
                     Items_ranks::value_type const& rhs)
                  { return lhs.first < rhs.first; }
                 );
        std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks),
                       [](Items_ranks::value_type const& e) { return e.second; }
                      );
    #endif
    auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000);

    bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(),
                                       make_fun_obj(items.cbegin()));

    std::cout << "duration: " << duration << " ms\n"
              << "result sorted: " << std::boolalpha << sorted << "\n";

    return 0;
}

这篇关于如何优化间接基数排序? (也就是如何优化不可预测的内存访问模式)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆