How to optimize indirect radix sorting? (aka how to optimize unpredictable memory access patterns)

I wrote an indirect radix sorting algorithm in C ++ (indirectly, I mean that it returns element indices):

#include <algorithm> #include <iterator> #include <vector> template<class It1, class It2> void radix_ipass( It1 begin, It1 const end, It2 const a, size_t const i, std::vector<std::vector<size_t> > &buckets) { size_t ncleared = 0; for (It1 j = begin; j != end; ++j) { size_t const k = a[*j][i]; while (k >= ncleared && ncleared < buckets.size()) { buckets[ncleared++].clear(); } if (k >= buckets.size()) { buckets.resize(k + 1); ncleared = buckets.size(); } buckets[k].push_back(size_t()); using std::swap; swap(buckets[k].back(), *j); } for (std::vector<std::vector<size_t> >::iterator j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j) { begin = std::swap_ranges(j->begin(), j->end(), begin); } } template<class It, class It2> void radix_isort(It const begin, It const end, It2 const items) { for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; } size_t smax = 0; for (It i = begin; i != end; ++i) { size_t const n = i->size(); smax = n > smax ? n : smax; } std::vector<std::vector<size_t> > buckets; for (size_t i = 0; i != smax; ++i) { radix_ipass( items, items + (end - begin), begin, smax - i - 1, buckets); } } 

It seems to work about 40% faster than std::sort when I test it with the following code (3920 ms compared to 6530 ms):

 #include <functional> template<class Key> struct key_comp : public Key { explicit key_comp(Key const &key = Key()) : Key(key) { } template<class T> bool operator()(T const &a, T const &b) const { return this->Key::operator()(a) < this->Key::operator()(b); } }; template<class Key> key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); } template<class T1, class T2> struct add : public std::binary_function<T1, T2, T1> { T1 operator()(T1 a, T2 const &b) const { return a += b; } }; template<class F> struct deref : public F { deref(F const &f) : F(f) { } typename std::iterator_traits< typename F::result_type >::value_type const &operator()(typename F::argument_type const &a) const { return *this->F::operator()(a); } }; template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); } size_t xorshf96(void) // random number generator { static size_t x = 123456789, y = 362436069, z = 521288629; x ^= x << 16; x ^= x >> 5; x ^= x << 1; size_t t = x; x = y; y = z; z = t ^ x ^ y; return z; } #include <stdio.h> #include <time.h> #include <array> int main(void) { typedef std::vector<std::array<size_t, 3> > Items; Items items(1 << 24); std::vector<size_t> ranks(items.size() * 2); for (size_t i = 0; i != items.size(); i++) { ranks[i] = i; for (size_t j = 0; j != items[i].size(); j++) { items[i][j] = xorshf96() & 0xFFF; } } clock_t const start = clock(); if (1) { radix_isort(items.begin(), items.end(), ranks.begin()); } else // STL sorting { std::sort( ranks.begin(), ranks.begin() + items.size(), make_key_comp(make_deref(std::bind1st( add<Items::const_iterator, ptrdiff_t>(), items.begin())))); } printf("%u ms\n", (unsigned)((clock() - start) * 1000 / CLOCKS_PER_SEC), std::min(ranks.begin(), ranks.end())); return 0; } 

Hmm, I think the best I can do, I thought.

But after many head bangs against the wall, I realized that prefetching at the beginning of radix_ipass can help reduce the result to 1440 ms (!):

 #include <xmmintrin.h> ... for (It1 j = begin; j != end; ++j) { #if defined(_MM_TRANSPOSE4_PS) // should be defined if xmmintrin.h is included enum { N = 8 }; if (end - j > N) { _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); } #endif ... } 

Clearly, the bottleneck is memory bandwidth - the access pattern is unpredictable.

So now my question is: what else can I do to make it even faster on similar amounts of data?

Or is there not much room left for improvement?

(I hope to avoid compromising the readability of the code if possible, so if the readability is damaged, the improvement should be significant.)

+6
source share
1 answer

Using a more compact data structure, combining ranks and values, can increase std::sort performance by 2–3 times. Essentially, now sorting is done on vector<pair<Value,Rank>> . The data type Value , std::array<integer_type, 3> been replaced for this with a more compact data structure pair<uint32_t, uint8_t> . Only half the byte is not used, and the comparison < can be performed in two stages, first using the supposedly efficient comparison of uint32_t (it is not clear whether the loop used by std::array<..>::operator< be optimized similarly to fast code, but replacing std::array<integer_type,3> on this data structure led to another performance improvement).

However, this is not as efficient as radix sorting. (Maybe you can customize QuickSort with prefetching?)

Besides this additional sorting method, I replaced xorshf96 with mt19937 , because I know how to provide seed for the latter;)

The seed and number of values ​​can be changed using two command line arguments: first the seed, then the counter.

Compiled with g ++ 4.9.0 20131022 using -std=c++11 -march=native -O3 , for 64-bit linux

Run examples; important note running on a Core2Duo U9400 processor (old and slow!)

  item count: 16000000
 using std :: sort
 duration: 12260 ms
 result sorted: true

 seed: 5648
 item count: 16000000
 using std :: sort
 duration: 12230 ms
 result sorted: true

 seed: 5648
 item count: 16000000
 using std :: sort
 duration: 12230 ms
 result sorted: true


 seed: 5648
 item count: 16000000
 using std :: sort with a packed data structure
 duration: 4290 ms
 result sorted: true

 seed: 5648
 item count: 16000000
 using std :: sort with a packed data structure
 duration: 4270 ms
 result sorted: true

 seed: 5648
 item count: 16000000
 using std :: sort with a packed data structure
 duration: 4280 ms
 result sorted: true


 item count: 16000000
 using radix sort
 duration: 3790 ms
 result sorted: true

 seed: 5648
 item count: 16000000
 using radix sort
 duration: 3820 ms
 result sorted: true

 seed: 5648
 item count: 16000000
 using radix sort
 duration: 3780 ms
 result sorted: true

New or changed code:

 template<class It> struct fun_obj { It beg; bool operator()(ptrdiff_t lhs, ptrdiff_t rhs) { return beg[lhs] < beg[rhs]; } }; template<class It> fun_obj<It> make_fun_obj(It beg) { return fun_obj<It>{beg}; } struct uint32p8_t { uint32_t m32; uint8_t m8; uint32p8_t(std::array<uint16_t, 3> const& a) : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8) , m8( a[2]&0xFF ) { } operator std::array<size_t, 3>() const { return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4), (m32&0xF)<<8 | m8}}; } friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs) { if(lhs.m32 < rhs.m32) return true; if(lhs.m32 > rhs.m32) return false; return lhs.m8 < rhs.m8; } }; #include <stdio.h> #include <time.h> #include <array> #include <iostream> #include <iomanip> #include <utility> #include <algorithm> #include <cstdlib> #include <iomanip> #include <random> int main(int argc, char* argv[]) { std::cout.sync_with_stdio(false); constexpr auto items_count_default = 2<<22; constexpr auto seed_default = 42; uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default; std::cout << "seed: " << seed << "\n"; size_t const items_count = argc > 2 ? std::atoll(argv[2]) : items_count_default; std::cout << "item count: " << items_count << "\n"; using Items_array_value_t = #ifdef RADIX_SORT size_t #elif defined(STDSORT) uint16_t #elif defined(STDSORT_PACKED) uint16_t #endif ; typedef std::vector<std::array<Items_array_value_t, 3> > Items; Items items(items_count); auto const ranks_count = #ifdef RADIX_SORT items.size() * 2 #elif defined(STDSORT) items.size() #elif defined(STDSORT_PACKED) items.size() #endif ; //auto prng = xorshf96; std::mt19937 gen(seed); std::uniform_int_distribution<> dist; auto prng = [&dist, &gen]{return dist(gen);}; std::vector<size_t> ranks(ranks_count); for (size_t i = 0; i != items.size(); i++) { ranks[i] = i; for (size_t j = 0; j != items[i].size(); j++) { items[i][j] = prng() & 0xFFF; } } std::cout << "using "; clock_t const start = clock(); #ifdef RADIX_SORT std::cout << "radix sort\n"; radix_isort(items.begin(), items.end(), ranks.begin()); #elif defined(STDSORT) std::cout << "std::sort\n"; std::sort(ranks.begin(), ranks.begin() + items.size(), make_fun_obj(items.cbegin()) //make_key_comp(make_deref(std::bind1st( // add<Items::const_iterator, ptrdiff_t>(), // items.begin()))) ); #elif defined(STDSORT_PACKED) std::cout << "std::sort with a packed data structure\n"; using Items_ranks = std::vector< std::pair<uint32p8_t, decltype(ranks)::value_type> >; Items_ranks items_ranks; size_t i = 0; for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i) { items_ranks.emplace_back(*iI, i); } std::sort(begin(items_ranks), end(items_ranks), [](Items_ranks::value_type const& lhs, Items_ranks::value_type const& rhs) { return lhs.first < rhs.first; } ); std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks), [](Items_ranks::value_type const& e) { return e.second; } ); #endif auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000); bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(), make_fun_obj(items.cbegin())); std::cout << "duration: " << duration << " ms\n" << "result sorted: " << std::boolalpha << sorted << "\n"; return 0; } 

Full code:

 #include <algorithm> #include <iterator> #include <vector> #include <cstddef> using std::size_t; using std::ptrdiff_t; #include <xmmintrin.h> template<class It1, class It2> void radix_ipass( It1 begin, It1 const end, It2 const a, size_t const i, std::vector<std::vector<size_t> > &buckets) { size_t ncleared = 0; for (It1 j = begin; j != end; ++j) { #if defined(_MM_TRANSPOSE4_PS) constexpr auto N = 8; if(end - j > N) { _mm_prefetch((char const *)(&a[j[N]][i]), _MM_HINT_T0); } #else #error SS intrinsic not found #endif size_t const k = a[*j][i]; while (k >= ncleared && ncleared < buckets.size()) { buckets[ncleared++].clear(); } if (k >= buckets.size()) { buckets.resize(k + 1); ncleared = buckets.size(); } buckets[k].push_back(size_t()); using std::swap; swap(buckets[k].back(), *j); } for (std::vector<std::vector<size_t> >::iterator j = buckets.begin(); j != buckets.begin() + ncleared; j->clear(), ++j) { begin = std::swap_ranges(j->begin(), j->end(), begin); } } template<class It, class It2> void radix_isort(It const begin, It const end, It2 const items) { for (ptrdiff_t i = 0; i != end - begin; ++i) { items[i] = i; } size_t smax = 0; for (It i = begin; i != end; ++i) { size_t const n = i->size(); smax = n > smax ? n : smax; } std::vector<std::vector<size_t> > buckets; for (size_t i = 0; i != smax; ++i) { radix_ipass( items, items + (end - begin), begin, smax - i - 1, buckets); } } #include <functional> template<class Key> struct key_comp : public Key { explicit key_comp(Key const &key = Key()) : Key(key) { } template<class T> bool operator()(T const &a, T const &b) const { return this->Key::operator()(a) < this->Key::operator()(b); } }; template<class Key> key_comp<Key> make_key_comp(Key const &key) { return key_comp<Key>(key); } template<class T1, class T2> struct add : public std::binary_function<T1, T2, T1> { T1 operator()(T1 a, T2 const &b) const { return a += b; } }; template<class F> struct deref : public F { deref(F const &f) : F(f) { } typename std::iterator_traits< typename F::result_type >::value_type const &operator()(typename F::argument_type const &a) const { return *this->F::operator()(a); } }; template<class T> deref<T> make_deref(T const &t) { return deref<T>(t); } size_t xorshf96(void) // random number generator { static size_t x = 123456789, y = 362436069, z = 521288629; x ^= x << 16; x ^= x >> 5; x ^= x << 1; size_t t = x; x = y; y = z; z = t ^ x ^ y; return z; } template<class It> struct fun_obj { It beg; bool operator()(ptrdiff_t lhs, ptrdiff_t rhs) { return beg[lhs] < beg[rhs]; } }; template<class It> fun_obj<It> make_fun_obj(It beg) { return fun_obj<It>{beg}; } struct uint32p8_t { uint32_t m32; uint8_t m8; uint32p8_t(std::array<uint16_t, 3> const& a) : m32( a[0]<<(32-3*4) | a[1]<<(32-2*3*4) | (a[2]&0xF00)>>8) , m8( a[2]&0xFF ) { } operator std::array<size_t, 3>() const { return {{m32&0xFFF00000 >> (32-3*4), m32&0x000FFF0 >> (32-2*3*4), (m32&0xF)<<8 | m8}}; } friend bool operator<(uint32p8_t const& lhs, uint32p8_t const& rhs) { if(lhs.m32 < rhs.m32) return true; if(lhs.m32 > rhs.m32) return false; return lhs.m8 < rhs.m8; } }; #include <stdio.h> #include <time.h> #include <array> #include <iostream> #include <iomanip> #include <utility> #include <algorithm> #include <cstdlib> #include <iomanip> #include <random> int main(int argc, char* argv[]) { std::cout.sync_with_stdio(false); constexpr auto items_count_default = 2<<22; constexpr auto seed_default = 42; uint32_t const seed = argc > 1 ? std::atoll(argv[1]) : seed_default; std::cout << "seed: " << seed << "\n"; size_t const items_count = argc > 2 ? std::atoll(argv[2]) : items_count_default; std::cout << "item count: " << items_count << "\n"; using Items_array_value_t = #ifdef RADIX_SORT size_t #elif defined(STDSORT) uint16_t #elif defined(STDSORT_PACKED) uint16_t #endif ; typedef std::vector<std::array<Items_array_value_t, 3> > Items; Items items(items_count); auto const ranks_count = #ifdef RADIX_SORT items.size() * 2 #elif defined(STDSORT) items.size() #elif defined(STDSORT_PACKED) items.size() #endif ; //auto prng = xorshf96; std::mt19937 gen(seed); std::uniform_int_distribution<> dist; auto prng = [&dist, &gen]{return dist(gen);}; std::vector<size_t> ranks(ranks_count); for (size_t i = 0; i != items.size(); i++) { ranks[i] = i; for (size_t j = 0; j != items[i].size(); j++) { items[i][j] = prng() & 0xFFF; } } std::cout << "using "; clock_t const start = clock(); #ifdef RADIX_SORT std::cout << "radix sort\n"; radix_isort(items.begin(), items.end(), ranks.begin()); #elif defined(STDSORT) std::cout << "std::sort\n"; std::sort(ranks.begin(), ranks.begin() + items.size(), make_fun_obj(items.cbegin()) //make_key_comp(make_deref(std::bind1st( // add<Items::const_iterator, ptrdiff_t>(), // items.begin()))) ); #elif defined(STDSORT_PACKED) std::cout << "std::sort with a packed data structure\n"; using Items_ranks = std::vector< std::pair<uint32p8_t, decltype(ranks)::value_type> >; Items_ranks items_ranks; size_t i = 0; for(auto iI = items.cbegin(); iI != items.cend(); ++iI, ++i) { items_ranks.emplace_back(*iI, i); } std::sort(begin(items_ranks), end(items_ranks), [](Items_ranks::value_type const& lhs, Items_ranks::value_type const& rhs) { return lhs.first < rhs.first; } ); std::transform(items_ranks.cbegin(), items_ranks.cend(), begin(ranks), [](Items_ranks::value_type const& e) { return e.second; } ); #endif auto const duration = (clock() - start) / (CLOCKS_PER_SEC / 1000); bool const sorted = std::is_sorted(ranks.begin(), ranks.begin() + items.size(), make_fun_obj(items.cbegin())); std::cout << "duration: " << duration << " ms\n" << "result sorted: " << std::boolalpha << sorted << "\n"; return 0; } 
+1
source

Source: https://habr.com/ru/post/958140/


All Articles