16 #ifndef TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
17 #define TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
41 namespace sort_strings_detail {
48 class PS5ParametersDefault
93 template <
typename Parameters>
123 template <
typename StringPtr>
128 size_t threshold = this->smallsort_threshold;
129 if (this->enable_rest_size) {
139 if (this->enable_rest_size)
147 template <
typename KeyType>
148 static inline unsigned char
149 lcpKeyType(
const KeyType& a,
const KeyType& b) {
151 return clz(a ^ b) / 8;
154 template <
typename KeyType>
155 static inline unsigned char
158 return sizeof(KeyType) - (
ctz(a) / 8);
162 template <
typename KeyType>
163 static inline unsigned char
165 return static_cast<unsigned char>(a >> (8 * (
sizeof(KeyType) - 1 - d)));
204 template <
size_t bktnum,
typename Context,
typename Classify,
205 typename StringPtr,
typename BktSizeType>
208 const BktSizeType* bkt) {
209 assert(!strptr.flipped());
212 typedef typename Context::key_type key_type;
215 key_type prevkey = 0;
226 if (bkt[b] != bkt[b + 1]) {
227 prevkey = classifier.get_splitter(b / 2);
228 assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
234 if (bkt[b] != bkt[b + 1]) {
235 prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
244 if (b < bktnum && b % 2 == 0)
goto even_bucket;
250 if (bkt[b] != bkt[b + 1]) {
251 key_type thiskey = classifier.get_splitter(b / 2);
252 assert(thiskey == get_key_at<key_type>(strset, bkt[b], depth));
255 strptr.
set_lcp(bkt[b], depth + rlcp);
259 <<
"LCP at odd-bucket " << b
260 <<
" [" << bkt[b] <<
"," << bkt[b + 1] <<
")"
261 <<
" is " << depth + rlcp;
264 assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
269 if (bkt[b] != bkt[b + 1]) {
270 key_type thiskey = get_key_at<key_type>(strset, bkt[b], depth);
273 strptr.
set_lcp(bkt[b], depth + rlcp);
277 <<
"LCP at even-bucket " << b
278 <<
" [" << bkt[b] <<
"," << bkt[b + 1] <<
")"
279 <<
" is " << depth + rlcp;
281 prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
290 template <
typename Context,
typename StringPtr,
typename BktSizeType>
291 class PS5SmallsortJob :
public PS5SortStep
303 typedef typename Context::key_type
key_type;
308 const StringPtr& strptr,
size_t depth)
311 <<
"enqueue depth=" <<
depth_
329 <<
"Process PS5SmallsortJob " <<
this <<
" of size " << n;
334 if (ctx_.enable_sequential_sample_sort && n >=
ctx_.smallsort_threshold)
364 static const size_t num_splitters = Context::Classify::num_splitters;
377 const size_t oversample_factor = 2;
384 std::minstd_rand rng(
reinterpret_cast<uintptr_t
>(samples.data()));
386 for (
size_t i = 0; i < sample_size; ++i)
387 samples[i] = get_key_at<key_type>(strset, rng() % n,
depth_);
389 std::sort(samples.begin(), samples.end());
396 strset, strset.begin(), strset.end(), bktcache,
depth_);
403 for (
size_t si = 0; si < n; ++si)
404 ++bktsize[bktcache[si]];
409 for (
unsigned int i = 1; i <
bktnum; ++i) {
410 bkt[i] =
bkt[i - 1] + bktsize[i];
420 typename StringSet::Iterator sbegin = sorted.begin();
422 for (
typename StringSet::Iterator str = strB.begin();
423 str != strB.end(); ++str, ++bktcache)
424 *(sbegin + --
bkt[*bktcache]) = std::move(*str);
434 TLX_LOGC(ctx.debug_lcp) <<
"Calculate LCP after sample sort step";
442 std::vector<SeqSampleSortStep>
ss_stack_;
445 typedef SeqSampleSortStep Step;
450 uint16_t* bktcache =
reinterpret_cast<uint16_t*
>(
bktcache_.data());
462 if (i < Step::bktnum)
464 size_t bktsize = s.bkt[i + 1] - s.bkt[i];
466 StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
474 else if (bktsize <
ctx_.smallsort_threshold)
476 assert(i / 2 <= Step::num_splitters);
477 if (i == Step::bktnum - 1)
479 <<
"Recurse[" << s.depth_ <<
"]: > bkt "
480 << i <<
" size " << bktsize <<
" no lcp";
483 <<
"Recurse[" << s.depth_ <<
"]: < bkt "
484 << i <<
" size " << bktsize <<
" lcp "
485 << int(s.splitter_lcp[i / 2] & 0x7F);
489 sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
493 if (i == Step::bktnum - 1)
495 <<
"Recurse[" << s.depth_ <<
"]: > bkt "
496 << i <<
" size " << bktsize <<
" no lcp";
499 <<
"Recurse[" << s.depth_ <<
"]: < bkt "
500 << i <<
" size " << bktsize <<
" lcp "
501 << int(s.splitter_lcp[i / 2] & 0x7F);
504 ctx_, sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F), bktcache);
513 else if (s.splitter_lcp[i / 2] & 0x80) {
516 <<
"Recurse[" << s.depth_ <<
"]: = bkt "
517 << i <<
" size " << bktsize <<
" is done!";
518 StringPtr spb = sp.copy_back();
522 s.depth_ +
lcpKeyDepth(s.classifier.get_splitter(i / 2)));
524 ctx_.donesize(bktsize);
526 else if (bktsize <
ctx_.smallsort_threshold)
529 <<
"Recurse[" << s.depth_ <<
"]: = bkt "
530 << i <<
" size " << bktsize <<
" lcp keydepth!";
532 ScopedMultiTimerSwitch sts_inssort(
mtimer_,
"mkqs");
538 <<
"Recurse[" << s.depth_ <<
"]: = bkt "
539 << i <<
" size " << bktsize <<
" lcp keydepth!";
557 if (
ctx_.enable_work_sharing &&
ctx_.threads_.has_idle()) {
573 <<
"Freeing top level of PS5SmallsortJob's sample_sort stack";
575 typedef SeqSampleSortStep Step;
578 while (s.idx_ < Step::bktnum)
582 size_t bktsize = s.bkt[i + 1] - s.bkt[i];
584 StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
594 if (i == Step::bktnum - 1)
596 <<
"Recurse[" << s.depth_ <<
"]: > bkt "
597 << i <<
" size " << bktsize <<
" no lcp";
600 <<
"Recurse[" << s.depth_ <<
"]: < bkt "
601 << i <<
" size " << bktsize <<
" lcp "
602 << int(s.splitter_lcp[i / 2] & 0x7F);
605 ctx_.enqueue(
this, sp,
606 s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
615 else if (s.splitter_lcp[i / 2] & 0x80) {
618 <<
"Recurse[" << s.depth_ <<
"]: = bkt "
619 << i <<
" size " << bktsize <<
" is done!";
620 StringPtr spb = sp.copy_back();
624 s.classifier.get_splitter(i / 2)));
626 ctx_.donesize(bktsize);
631 <<
"Recurse[" << s.depth_ <<
"]: = bkt "
632 << i <<
" size " << bktsize <<
" lcp keydepth!";
635 ctx_.enqueue(
this, sp, s.depth_ +
sizeof(
key_type));
648 return (a > b) ? 1 : (a < b) ? -1 : 0;
651 template <
typename Type>
653 med3(Type* A,
size_t i,
size_t j,
size_t k) {
654 if (A[i] == A[j])
return i;
655 if (A[k] == A[i] || A[k] == A[j])
return k;
657 if (A[j] < A[k])
return j;
658 if (A[i] < A[k])
return k;
662 if (A[j] > A[k])
return j;
663 if (A[i] < A[k])
return i;
671 const StringSet& strings = strptr.active();
672 size_t n = strptr.size();
674 for (pi = 1; --n > 0; ++pi) {
675 typename StringSet::String tmps = std::move(strings.at(pi));
677 for (pj = pi; pj > 0; --pj) {
678 if (cache[pj - 1] <= tmpc)
680 strings.at(pj) = std::move(strings.at(pj - 1));
681 cache[pj] = cache[pj - 1];
683 strings.at(pj) = std::move(tmps);
689 template <
bool CacheDirty>
692 StringPtr strptr = _strptr.copy_back();
694 if (strptr.size() <= 1)
return;
700 size_t start = 0, bktsize = 1;
701 for (
size_t i = 0; i < strptr.size() - 1; ++i) {
703 if (cache[i] == cache[i + 1]) {
708 if (start != 0 && strptr.with_lcp) {
709 int rlcp =
lcpKeyType(cache[start - 1], cache[start]);
710 strptr.set_lcp(start, depth + rlcp);
715 if (cache[start] & 0xFF) {
718 strptr.sub(start, bktsize), depth +
sizeof(
key_type),
723 strptr.sub(start, bktsize).fill_lcp(depth +
lcpKeyDepth(cache[start]));
730 if (start != 0 && strptr.with_lcp) {
731 int rlcp =
lcpKeyType(cache[start - 1], cache[start]);
732 strptr.set_lcp(start, depth + rlcp);
736 if (cache[start] & 0xFF) {
739 strptr.sub(start, bktsize), depth +
sizeof(
key_type),
744 strptr.sub(start, bktsize).fill_lcp(depth +
lcpKeyDepth(cache[start]));
760 MKQSStep(Context& ctx,
const StringPtr& strptr,
761 key_type* cache,
size_t depth,
bool CacheDirty)
768 typename StringSet::Iterator it = strset.begin();
769 for (
size_t i = 0; i < n; ++i, ++it) {
770 cache_[i] = get_key<key_type>(strset, *it, depth);
777 med3(
cache_, n / 2 - n / 8, n / 2, n / 2 + n / 8),
785 key_type max_lt = 0, min_gt = std::numeric_limits<key_type>::max();
789 size_t leq = 1, llt = 1, rgt = n - 1, req = n - 1;
794 int r =
cmp(cache[llt], pivot);
796 min_gt =
std::min(min_gt, cache[llt]);
800 std::swap(strset.at(leq), strset.at(llt));
805 max_lt = std::max(max_lt, cache[llt]);
811 int r =
cmp(cache[rgt], pivot);
813 max_lt = std::max(max_lt, cache[rgt]);
817 std::swap(strset.at(req), strset.at(rgt));
822 min_gt =
std::min(min_gt, cache[rgt]);
828 std::swap(strset.at(llt), strset.at(rgt));
834 size_t num_leq = leq, num_req = n - 1 - req;
843 std::swap_ranges(strset.begin(), strset.begin() + size1,
844 strset.begin() + llt - size1);
845 std::swap_ranges(cache, cache + size1, cache + llt - size1);
849 std::swap_ranges(strset.begin() + llt, strset.begin() + llt + size2,
850 strset.begin() + n - size2);
851 std::swap_ranges(cache + llt, cache + llt + size2,
859 assert(max_lt == *std::max_element(
871 assert(min_gt == *std::min_element(
879 ++ctx.base_sort_steps;
901 if (!
ctx_.enable_sequential_mkqs ||
902 strptr.size() <
ctx_.inssort_threshold) {
904 <<
"insertion_sort() size "
905 << strptr.size() <<
" depth " << depth;
907 ScopedMultiTimerSwitch sts_inssort(
mtimer_,
"inssort");
909 ctx_.donesize(strptr.size());
914 <<
"sort_mkqs_cache() size " << strptr.size() <<
" depth " << depth;
938 if (ms.num_lt_ == 0) {
941 else if (ms.num_lt_ <
ctx_.inssort_threshold) {
943 insertion_sort_cache<false>(ms.strptr_.sub(0, ms.num_lt_),
944 ms.cache_, ms.depth_);
945 ctx_.donesize(ms.num_lt_);
950 ms.strptr_.sub(0, ms.num_lt_),
951 ms.cache_, ms.depth_,
false);
955 else if (ms.idx_ == 2) {
956 StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
958 assert(ms.num_eq_ > 0);
960 if (!ms.eq_recurse_) {
961 StringPtr spb = sp.copy_back();
962 spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
963 ctx_.donesize(spb.size());
965 else if (ms.num_eq_ <
ctx_.inssort_threshold) {
966 ScopedMultiTimerSwitch sts_inssort(
mtimer_,
"inssort");
967 insertion_sort_cache<true>(sp, ms.cache_ + ms.num_lt_,
969 ctx_.donesize(ms.num_eq_);
974 ms.cache_ + ms.num_lt_,
975 ms.depth_ +
sizeof(
key_type),
true);
979 else if (ms.idx_ == 3) {
980 StringPtr sp = ms.strptr_.sub(
981 ms.num_lt_ + ms.num_eq_, ms.num_gt_);
983 if (ms.num_gt_ == 0) {
986 else if (ms.num_gt_ <
ctx_.inssort_threshold) {
987 ScopedMultiTimerSwitch sts_inssort(
mtimer_,
"inssort");
988 insertion_sort_cache<false>(
989 sp, ms.cache_ + ms.num_lt_ + ms.num_eq_, ms.depth_);
990 ctx_.donesize(ms.num_gt_);
995 ms.cache_ + ms.num_lt_ + ms.num_eq_,
1010 if (
ctx_.enable_work_sharing &&
ctx_.threads_.has_idle()) {
1019 for (
unsigned int fl = 0; fl < 8; ++fl)
1026 <<
"Freeing top level of PS5SmallsortJob's mkqs stack - size "
1033 if (ms.idx_ == 0 && ms.num_lt_ != 0)
1036 ctx_.enqueue(
this, ms.strptr_.sub(0, ms.num_lt_), ms.depth_);
1040 assert(ms.num_eq_ > 0);
1044 if (ms.eq_recurse_) {
1046 ctx_.enqueue(
this, sp, ms.depth_ +
sizeof(
key_type));
1050 spb.
fill_lcp(ms.depth_ + ms.lcp_eq_);
1051 ctx_.donesize(ms.num_eq_);
1054 if (ms.idx_ <= 2 && ms.num_gt_ != 0)
1058 this, ms.strptr_.sub(ms.num_lt_ + ms.num_eq_, ms.num_gt_),
1072 <<
"SmallSort[" <<
depth_ <<
"] "
1073 <<
"all substeps done -> LCP calculation";
1095 template <
typename Context,
typename StringPtr>
1101 typedef typename Context::key_type
key_type;
1117 std::atomic<size_t>
pwork_;
1122 static const size_t treebits_ = Context::Classify::treebits;
1123 static const size_t num_splitters_ = Context::Classify::num_splitters;
1154 <<
" flip=" <<
strptr_.flipped();
1156 ctx.threads_.enqueue([
this]() {
sample(); });
1157 ++ctx.para_ss_steps;
1167 TLX_LOGC(
ctx_.debug_jobs) <<
"Process SampleJob @ " <<
this;
1169 const size_t oversample_factor = 2;
1173 size_t n = strset.size();
1177 std::minstd_rand rng(
reinterpret_cast<uintptr_t
>(samples.
data()));
1179 for (
size_t i = 0; i < sample_size; ++i)
1180 samples[i] = get_key_at<key_type>(strset, rng() % n,
depth_);
1182 std::sort(samples.
begin(), samples.
end());
1188 for (
unsigned int p = 0; p <
parts_; ++p) {
1189 ctx_.threads_.enqueue([
this, p]() {
count(p); });
1196 void count(
unsigned int p) {
1198 TLX_LOGC(
ctx_.debug_jobs) <<
"Process CountJob " << p <<
" @ " <<
this;
1204 if (strE < strB) strE = strB;
1207 uint16_t* bktcache =
bktcache_[p].data();
1211 size_t* bkt =
bkt_[p].data();
1212 memset(bkt, 0,
bktnum_ *
sizeof(
size_t));
1214 for (uint16_t* bc = bktcache; bc != bktcache + (strE - strB); ++bc)
1223 TLX_LOGC(
ctx_.debug_jobs) <<
"Finishing CountJob " <<
this <<
" with prefixsum";
1226 if (
ctx_.use_only_first_sortstep)
1231 for (
unsigned int i = 0; i <
bktnum_; ++i) {
1232 for (
unsigned int p = 0; p <
parts_; ++p) {
1240 for (
unsigned int p = 0; p <
parts_; ++p) {
1250 TLX_LOGC(
ctx_.debug_jobs) <<
"Process DistributeJob " << p <<
" @ " <<
this;
1256 if (strE < strB) strE = strB;
1260 typename StringSet::Iterator sbegin = sorted.begin();
1262 uint16_t* bktcache =
bktcache_[p].data();
1263 size_t* bkt =
bkt_[p].data();
1265 for (
StrIterator str = strB; str != strE; ++str, ++bktcache)
1266 *(sbegin + --bkt[*bktcache]) = std::move(*str);
1279 <<
"Finishing DistributeJob " <<
this <<
" with enqueuing subjobs";
1281 size_t* bkt =
bkt_[0].data();
1285 assert(bkt[0] == 0);
1295 size_t bktsize = bkt[i + 1] - bkt[i];
1299 else if (bktsize == 1) {
1300 strptr_.flip(bkt[i], 1).copy_back();
1306 <<
"Recurse[" <<
depth_ <<
"]: < bkt " << bkt[i]
1307 <<
" size " << bktsize <<
" lcp "
1310 ctx_.enqueue(
this,
strptr_.flip(bkt[i], bktsize),
1315 bktsize = bkt[i + 1] - bkt[i];
1319 else if (bktsize == 1) {
1320 strptr_.flip(bkt[i], 1).copy_back();
1328 <<
"Recurse[" <<
depth_ <<
"]: = bkt " << bkt[i]
1329 <<
" size " << bktsize <<
" is done!";
1330 StringPtr sp =
strptr_.flip(bkt[i], bktsize).copy_back();
1333 ctx_.donesize(bktsize);
1337 <<
"Recurse[" <<
depth_ <<
"]: = bkt " << bkt[i]
1338 <<
" size " << bktsize <<
" lcp keydepth!";
1340 ctx_.enqueue(
this,
strptr_.flip(bkt[i], bktsize),
1347 size_t bktsize = bkt[i + 1] - bkt[i];
1352 else if (bktsize == 1) {
1353 strptr_.flip(bkt[i], 1).copy_back();
1358 <<
"Recurse[" <<
depth_ <<
"]: > bkt " << bkt[i]
1359 <<
" size " << bktsize <<
" no lcp";
1374 ScopedMultiTimer smt(
ctx_.mtimer,
"para_ss");
1377 <<
"pSampleSortStep[" <<
depth_ <<
"]: all substeps done.";
1379 ps5_sample_sort_lcp<bktnum_>(
1392 template <
typename Parameters>
1393 template <
typename StringPtr>
1395 PS5SortStep* pstep,
const StringPtr& strptr,
size_t depth) {
1396 if (this->enable_parallel_sample_sort &&
1397 (strptr.size() > sequential_threshold() ||
1398 this->use_only_first_sortstep)) {
1402 if (strptr.size() < (1LLU << 32)) {
1404 *
this, pstep, strptr, depth);
1405 threads_.
enqueue([j]() { j->run(); });
1408 auto j =
new PS5SmallsortJob<PS5Context, StringPtr, uint64_t>(
1409 *
this, pstep, strptr, depth);
1410 threads_.
enqueue([j]() { j->run(); });
1419 template <
typename PS5Parameters,
typename StringPtr>
1423 Context ctx(std::thread::hardware_concurrency());
1424 ctx.total_size = strptr.
size();
1425 ctx.rest_size = strptr.
size();
1426 ctx.num_threads = ctx.threads_.size();
1429 timer.
start(
"sort");
1431 ctx.enqueue(
nullptr, strptr, depth);
1432 ctx.threads_.loop_until_empty();
1436 assert(!ctx.enable_rest_size || ctx.rest_size == 0);
1442 <<
" sizeof(key_type)=" <<
sizeof(
typename PS5Parameters::key_type)
1443 <<
" splitter_treebits=" <<
size_t(BigSortStep::treebits_)
1444 <<
" num_splitters=" << size_t(BigSortStep::num_splitters_)
1445 <<
" num_threads=" << ctx.num_threads
1446 <<
" enable_work_sharing=" << size_t(ctx.enable_work_sharing)
1447 <<
" use_restsize=" << size_t(ctx.enable_rest_size)
1448 <<
" tm_para_ss=" << ctx.mtimer.get(
"para_ss")
1449 <<
" tm_seq_ss=" << ctx.mtimer.get(
"sequ_ss")
1450 <<
" tm_mkqs=" << ctx.mtimer.get(
"mkqs")
1451 <<
" tm_inssort=" << ctx.mtimer.get(
"inssort")
1452 <<
" tm_total=" << ctx.mtimer.total()
1454 << (ctx.num_threads * timer.
total()) - ctx.mtimer.total()
1455 <<
" steps_para_sample_sort=" << ctx.para_ss_steps
1456 <<
" steps_seq_sample_sort=" << ctx.sequ_ss_steps
1457 <<
" steps_base_sort=" << ctx.base_sort_steps;
1462 template <
typename PS5Parameters,
typename StringPtr>
1465 const StringPtr& strptr,
size_t depth,
size_t memory = 0) {
1469 const StringSet& strset = strptr.active();
1471 typedef StringShadowPtr<StringSet> StringShadowPtr;
1472 typedef typename StringSet::Container Container;
1475 Container shadow = strset.allocate(strset.size());
1476 StringShadowPtr new_strptr(strset, StringSet(shadow));
1478 parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1480 StringSet::deallocate(shadow);
1485 template <
typename PS5Parameters,
typename StringPtr>
1486 typename enable_if<StringPtr::with_lcp, void>::type
1488 const StringPtr& strptr,
size_t depth,
size_t memory = 0) {
1492 typedef typename StringPtr::LcpType LcpType;
1493 const StringSet& strset = strptr.active();
1496 typedef typename StringSet::Container Container;
1499 Container shadow = strset.allocate(strset.size());
1502 parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1504 StringSet::deallocate(shadow);
1509 template <
typename StringPtr>
1511 const StringPtr& strptr,
size_t depth,
size_t memory) {
1512 return parallel_sample_sort_params<PS5ParametersDefault>(
1513 strptr, depth, memory);
1519 #endif // !TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER