14 #ifndef STXXL_SORT_STREAM_HEADER
15 #define STXXL_SORT_STREAM_HEADER
17 #ifdef STXXL_BOOST_CONFIG
18 #include <boost/config.hpp>
21 #include <stxxl/bits/stream/stream.h>
25 __STXXL_BEGIN_NAMESPACE
32 template <
class ValueType,
class TriggerEntryType>
35 typedef TriggerEntryType trigger_entry_type;
36 typedef ValueType value_type;
37 typedef typename trigger_entry_type::bid_type bid_type;
38 typedef stxxl::int64 size_type;
39 typedef std::vector<trigger_entry_type> run_type;
42 std::vector<run_type> runs;
43 std::vector<unsigned_type> runs_sizes;
50 std::vector<ValueType> small_;
52 sorted_runs() : elements(0) { }
58 void deallocate_blocks()
61 for (unsigned_type i = 0; i < runs.size(); ++i)
63 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].begin()),
64 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i].end()));
81 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(
typename Input_::value_type),
82 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
89 typedef Cmp_ cmp_type;
90 typedef typename Input_::value_type value_type;
93 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
94 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
97 typedef typename sorted_runs_type::run_type run_type;
98 sorted_runs_type result_;
100 bool result_computed;
102 void compute_result();
103 void sort_run(
block_type * run, unsigned_type elements)
105 if (block_type::has_filler)
108 ArrayOfSequencesIterator<
111 ArrayOfSequencesIterator<
115 TwoToOneDimArrayRowAdaptor<
118 TwoToOneDimArrayRowAdaptor<
125 std::sort(run[0].elem, run[0].elem + elements, cmp);
134 input(i), cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), result_computed(false)
136 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
144 if (!result_computed)
147 result_computed =
true;
148 #ifdef STXXL_PRINT_STAT_AFTER_RF
149 STXXL_MSG(*stats::get_instance());
157 template <
class Input_,
class Cmp_,
unsigned BlockSize_,
class AllocStr_>
158 void runs_creator<Input_, Cmp_, BlockSize_, AllocStr_>::compute_result()
161 unsigned_type m2 = m_ / 2;
162 const unsigned_type el_in_run = m2 * block_type::size;
163 STXXL_VERBOSE1(
"runs_creator::compute_result m2=" << m2);
164 unsigned_type pos = 0;
166 #ifndef STXXL_SMALL_INPUT_PSORT_OPT
167 block_type * Blocks1 =
new block_type[m2 * 2];
170 block_type * Blocks1 =
new block_type[1];
172 while (!input.empty() && pos != block_type::size)
174 Blocks1[pos / block_type::size][pos % block_type::size] = *input;
180 while (!input.empty() && pos != block_type::size)
182 result_.small_.push_back(*input);
187 block_type * Blocks1;
189 if (pos == block_type::size)
191 block_type * NewBlocks =
new block_type[m2 * 2];
192 std::copy(result_.small_.begin(), result_.small_.end(), NewBlocks[0].begin());
193 result_.small_.clear();
199 STXXL_VERBOSE1(
"runs_creator: Small input optimization, input length: " << pos);
200 result_.elements = pos;
201 std::sort(result_.small_.begin(), result_.small_.end(), cmp);
206 while (!input.empty() && pos != el_in_run)
208 Blocks1[pos / block_type::size][pos % block_type::size] = *input;
214 sort_run(Blocks1, pos);
215 result_.elements = pos;
216 if (pos < block_type::size && input.empty())
218 STXXL_VERBOSE1(
"runs_creator: Small input optimization, input length: " << pos);
219 result_.small_.resize(pos);
220 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + pos, result_.small_.begin());
226 block_type * Blocks2 = Blocks1 + m2;
232 unsigned_type cur_run_size = div_and_round_up(pos, block_type::size);
233 run.resize(cur_run_size);
235 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
236 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
239 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
241 result_.runs_sizes.push_back(pos);
244 for ( ; pos != el_in_run; ++pos)
245 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
248 for (i = 0; i < cur_run_size; ++i)
250 run[i].value = Blocks1[i][0];
251 write_reqs[i] = Blocks1[i].write(run[i].bid);
254 result_.runs.push_back(run);
259 wait_all(write_reqs, write_reqs + cur_run_size);
265 STXXL_VERBOSE1(
"Filling the second part of the allocated blocks");
267 while (!input.empty() && pos != el_in_run)
269 Blocks2[pos / block_type::size][pos % block_type::size] = *input;
273 result_.elements += pos;
279 sort_run(Blocks1, pos);
280 wait_all(write_reqs, write_reqs + cur_run_size);
281 bm->
delete_blocks(trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
282 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end()));
284 cur_run_size = div_and_round_up(pos, block_type::size);
285 run.resize(cur_run_size);
287 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
288 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
291 result_.runs_sizes[0] = pos;
293 for ( ; pos != 2 * el_in_run; ++pos)
294 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
297 assert(cur_run_size > m2);
299 for (i = 0; i < m2; ++i)
301 run[i].value = Blocks1[i][0];
302 write_reqs[i]->
wait();
303 write_reqs[i] = Blocks1[i].write(run[i].bid);
308 for ( ; i < cur_run_size; ++i)
310 run[i].value = Blocks1[i][0];
311 write_reqs1[i - m2] = Blocks1[i].write(run[i].bid);
314 result_.runs[0] = run;
316 wait_all(write_reqs, write_reqs + m2);
318 wait_all(write_reqs1, write_reqs1 + cur_run_size - m2);
319 delete[] write_reqs1;
326 sort_run(Blocks2, pos);
328 cur_run_size = div_and_round_up(pos, block_type::size);
329 run.resize(cur_run_size);
331 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
332 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
335 for (i = 0; i < cur_run_size; ++i)
337 run[i].value = Blocks2[i][0];
338 write_reqs[i]->
wait();
339 write_reqs[i] = Blocks2[i].write(run[i].bid);
341 assert((pos % el_in_run) == 0);
343 result_.runs.push_back(run);
344 result_.runs_sizes.push_back(pos);
346 while (!input.empty())
349 while (!input.empty() && pos != el_in_run)
351 Blocks1[pos / block_type::size][pos % block_type::size] = *input;
355 result_.elements += pos;
356 sort_run(Blocks1, pos);
357 cur_run_size = div_and_round_up(pos, block_type::size);
358 run.resize(cur_run_size);
360 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.begin()),
361 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.end())
364 result_.runs_sizes.push_back(pos);
367 for ( ; pos != el_in_run; ++pos)
368 Blocks1[pos / block_type::size][pos % block_type::size] = cmp.max_value();
371 for (i = 0; i < cur_run_size; ++i)
373 run[i].value = Blocks1[i][0];
374 write_reqs[i]->
wait();
375 write_reqs[i] = Blocks1[i].write(run[i].bid);
377 result_.runs.push_back(run);
379 std::swap(Blocks1, Blocks2);
382 wait_all(write_reqs, write_reqs + m2);
384 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
395 template <
class ValueType_>
398 typedef ValueType_ value_type;
426 typedef Cmp_ cmp_type;
427 typedef ValueType_ value_type;
430 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
431 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
434 typedef typename sorted_runs_type::run_type run_type;
435 sorted_runs_type result_;
438 bool output_requested;
440 const unsigned_type m2;
441 const unsigned_type el_in_run;
442 unsigned_type cur_el;
448 void sort_run(
block_type * run, unsigned_type elements)
450 if (block_type::has_filler)
453 ArrayOfSequencesIterator<
454 block_type,
typename block_type::value_type, block_type::size
456 ArrayOfSequencesIterator<
457 block_type,
typename block_type::value_type, block_type::size
460 TwoToOneDimArrayRowAdaptor<
463 TwoToOneDimArrayRowAdaptor<
470 std::sort(run[0].elem, run[0].elem + elements, cmp);
478 unsigned_type cur_el_reg = cur_el;
479 sort_run(Blocks1, cur_el_reg);
480 result_.elements += cur_el_reg;
481 if (cur_el_reg < unsigned_type(block_type::size) &&
482 unsigned_type(result_.elements) == cur_el_reg)
484 STXXL_VERBOSE1(
"runs_creator(use_push): Small input optimization, input length: " << cur_el_reg);
485 result_.small_.resize(cur_el_reg);
486 std::copy(Blocks1[0].begin(), Blocks1[0].begin() + cur_el_reg, result_.small_.begin());
490 const unsigned_type cur_run_size = div_and_round_up(cur_el_reg, block_type::size);
491 run.resize(cur_run_size);
494 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.
begin()),
495 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.
end())
498 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
500 result_.runs_sizes.push_back(cur_el_reg);
503 for ( ; cur_el_reg != el_in_run; ++cur_el_reg)
504 Blocks1[cur_el_reg / block_type::size][cur_el_reg % block_type::size] = cmp.max_value();
508 for ( ; i < cur_run_size; ++i)
510 run[i].value = Blocks1[i][0];
511 if (write_reqs[i].
get())
512 write_reqs[i]->
wait();
514 write_reqs[i] = Blocks1[i].write(run[i].bid);
516 result_.runs.push_back(run);
518 for (i = 0; i < m2; ++i)
519 if (write_reqs[i].
get())
520 write_reqs[i]->
wait();
525 delete[] ((Blocks1 < Blocks2) ? Blocks1 : Blocks2);
527 Blocks1 = Blocks2 = NULL;
535 cmp(c), m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()), output_requested(false),
540 Blocks2(Blocks1 + m2),
543 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
548 if (!output_requested)
554 void push(
const value_type & val)
556 assert(output_requested ==
false);
557 unsigned_type cur_el_reg = cur_el;
558 if (cur_el_reg < el_in_run)
565 assert(el_in_run == cur_el);
569 sort_run(Blocks1, el_in_run);
570 result_.elements += el_in_run;
572 const unsigned_type cur_run_size = div_and_round_up(el_in_run, block_type::size);
573 run.resize(cur_run_size);
576 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.
begin()),
577 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(run.
end())
580 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
582 result_.runs_sizes.push_back(el_in_run);
584 for (unsigned_type i = 0; i < cur_run_size; ++i)
586 run[i].value = Blocks1[i][0];
587 if (write_reqs[i].
get())
588 write_reqs[i]->
wait();
590 write_reqs[i] = Blocks1[i].write(run[i].bid);
593 result_.runs.push_back(run);
595 std::swap(Blocks1, Blocks2);
605 if (!output_requested)
608 output_requested =
true;
610 #ifdef STXXL_PRINT_STAT_AFTER_RF
611 STXXL_MSG(*stats::get_instance());
625 template <
class ValueType_>
628 typedef ValueType_ value_type;
653 typedef ValueType_ value_type;
656 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
657 typedef AllocStr_ alloc_strategy_type;
661 typedef Cmp_ cmp_type;
662 typedef sorted_runs<value_type, trigger_entry_type> sorted_runs_type;
665 typedef typename sorted_runs_type::run_type run_type;
666 sorted_runs_type result_;
670 unsigned_type offset;
671 unsigned_type iblock;
673 alloc_strategy_type alloc_strategy;
682 m_(memory_to_use / BlockSize_ / sort_memory_usage_factor()),
684 cur_block(writer.get_free_block()),
689 assert(2 * BlockSize_ * sort_memory_usage_factor() <= memory_to_use);
694 void push(
const value_type & val)
696 assert(offset < block_type::size);
698 (*cur_block)[offset] = val;
701 if (offset == block_type::size)
707 result_.runs.resize(irun + 1);
708 result_.runs[irun].resize(iblock + 1);
711 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
712 result_.runs[irun].begin() + iblock),
713 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
714 result_.runs[irun].end())
717 result_.runs[irun][iblock].value = (*cur_block)[0];
718 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
730 if (offset == 0 && iblock == 0)
734 result_.runs_sizes.resize(irun + 1);
735 result_.runs_sizes.back() = iblock * block_type::size + offset;
739 while (offset != block_type::size)
741 (*cur_block)[offset] = cmp.max_value();
748 result_.runs.resize(irun + 1);
749 result_.runs[irun].resize(iblock + 1);
752 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
753 result_.runs[irun].begin() + iblock),
754 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
755 result_.runs[irun].end())
758 result_.runs[irun][iblock].value = (*cur_block)[0];
759 cur_block = writer.write(cur_block, result_.runs[irun][iblock].bid);
785 template <
class RunsType_,
class Cmp_>
788 typedef typename RunsType_::block_type block_type;
789 typedef typename block_type::value_type value_type;
790 STXXL_VERBOSE2(
"Elements: " << sruns.elements);
791 unsigned_type nruns = sruns.runs.size();
792 STXXL_VERBOSE2(
"Runs: " << nruns);
793 unsigned_type irun = 0;
794 for (irun = 0; irun < nruns; ++irun)
796 const unsigned_type nblocks = sruns.runs[irun].size();
797 block_type * blocks =
new block_type[nblocks];
799 for (unsigned_type j = 0; j < nblocks; ++j)
801 reqs[j] = blocks[j].read(sruns.runs[irun][j].bid);
804 for (unsigned_type j = 0; j < nblocks; ++j)
806 if (cmp(blocks[j][0], sruns.runs[irun][j].value) ||
807 cmp(sruns.runs[irun][j].value, blocks[j][0]))
809 STXXL_ERRMSG(
"check_sorted_runs wrong trigger in the run");
813 if (!stxxl::is_sorted(
815 ArrayOfSequencesIterator<
816 block_type,
typename block_type::value_type, block_type::size
818 ArrayOfSequencesIterator<
819 block_type,
typename block_type::value_type, block_type::size
820 >(blocks, sruns.runs_sizes[irun]),
822 TwoToOneDimArrayRowAdaptor<
823 block_type, value_type, block_type::size
825 TwoToOneDimArrayRowAdaptor<
826 block_type, value_type, block_type::size
830 sruns.runs_sizes[irun]
835 STXXL_ERRMSG(
"check_sorted_runs wrong order in the run");
843 STXXL_MSG(
"Checking runs finished successfully");
856 template <
class RunsType_,
858 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
861 typedef RunsType_ sorted_runs_type;
862 typedef AllocStr_ alloc_strategy;
863 typedef typename sorted_runs_type::size_type size_type;
864 typedef Cmp_ value_cmp;
865 typedef typename sorted_runs_type::run_type run_type;
866 typedef typename sorted_runs_type::block_type block_type;
867 typedef typename block_type::bid_type bid_type;
869 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
870 typedef sort_local::run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
871 typedef loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size> loser_tree_type;
873 typedef stxxl::int64 diff_type;
874 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
875 typedef typename std::vector<sequence>::size_type seqs_size_type;
876 std::vector<sequence> * seqs;
877 std::vector<block_type *> * buffers;
880 sorted_runs_type sruns;
883 size_type elements_remaining;
884 unsigned_type buffer_pos;
885 block_type * current_block;
886 run_type consume_seq;
888 loser_tree_type * losers;
889 int_type * prefetch_seq;
891 #ifdef STXXL_CHECK_ORDER_IN_SORTS
892 typename block_type::value_type last_element;
895 void merge_recursively();
897 void deallocate_prefetcher()
905 delete[] prefetch_seq;
909 sruns.deallocate_blocks();
912 void initialize_current_block()
914 if (do_parallel_merge())
916 #if STXXL_PARALLEL_MULTIWAY_MERGE
918 seqs =
new std::vector<sequence>(nruns);
919 buffers =
new std::vector<block_type *>(nruns);
921 for (unsigned_type i = 0; i < nruns; i++)
924 (*seqs)[i] = std::make_pair((*buffers)[i]->begin(), (*buffers)[i]->end());
936 losers =
new loser_tree_type(prefetcher, nruns, run_cursor2_cmp_type(cmp));
942 void fill_current_block()
944 if (do_parallel_merge())
946 #if STXXL_PARALLEL_MULTIWAY_MERGE
948 diff_type rest = block_type::size;
953 diff_type total_size = 0;
955 for (seqs_size_type i = 0; i < (*seqs).size(); i++)
957 if ((*seqs)[i].first == (*seqs)[i].second)
960 if (min_last_element == NULL)
961 min_last_element = &(*((*seqs)[i].second - 1));
963 min_last_element = cmp(*min_last_element, *((*seqs)[i].second - 1)) ? min_last_element : &(*((*seqs)[i].second - 1));
965 total_size += (*seqs)[i].second - (*seqs)[i].first;
966 STXXL_VERBOSE1(
"last " << *((*seqs)[i].second - 1) <<
" block size " << ((*seqs)[i].second - (*seqs)[i].first));
969 assert(min_last_element != NULL);
971 STXXL_VERBOSE1(
"min_last_element " << min_last_element <<
" total size " << total_size + (block_type::size - rest));
973 diff_type less_equal_than_min_last = 0;
975 for (seqs_size_type i = 0; i < (*seqs).size(); i++)
977 if ((*seqs)[i].first == (*seqs)[i].second)
980 typename block_type::iterator position = std::upper_bound((*seqs)[i].first, (*seqs)[i].second, *min_last_element, cmp);
981 STXXL_VERBOSE1(
"greater equal than " << position - (*seqs)[i].first);
982 less_equal_than_min_last += position - (*seqs)[i].first;
985 STXXL_VERBOSE1(
"finished loop");
987 ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest);
989 STXXL_VERBOSE1(
"before merge" << output_size);
991 stxxl::parallel::multiway_merge((*seqs).begin(), (*seqs).end(), current_block->end() - rest, cmp, output_size);
994 STXXL_VERBOSE1(
"after merge");
998 STXXL_VERBOSE1(
"so long");
1000 for (seqs_size_type i = 0; i < (*seqs).size(); i++)
1002 if ((*seqs)[i].first == (*seqs)[i].second)
1006 (*seqs)[i].first = (*buffers)[i]->begin();
1007 (*seqs)[i].second = (*buffers)[i]->end();
1008 STXXL_VERBOSE1(
"block ran empty " << i);
1012 (*seqs).erase((*seqs).begin() + i);
1013 (*buffers).erase((*buffers).begin() + i);
1014 STXXL_VERBOSE1(
"seq removed " << i);
1018 }
while (rest > 0 && (*seqs).size() > 0);
1020 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1021 if (!stxxl::is_sorted(current_block->begin(), current_block->end(), cmp))
1023 for (
value_type * i = current_block->begin() + 1; i != current_block->end(); i++)
1024 if (cmp(*i, *(i - 1)))
1026 STXXL_VERBOSE1(
"Error at position " << (i - current_block->begin()));
1041 losers->multi_merge(current_block->elem);
1055 runs_merger(
const sorted_runs_type & r, value_cmp c, unsigned_type memory_to_use) :
1056 sruns(r), m_(memory_to_use / block_type::raw_size / sort_memory_usage_factor() ), cmp(c),
1057 elements_remaining(r.elements),
1058 current_block(NULL),
1060 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1061 , last_element(cmp.min_value())
1068 if (!sruns.small_.empty())
1071 STXXL_VERBOSE1(
"runs_merger: small input optimization, input length: " << elements_remaining);
1072 assert(elements_remaining == size_type(sruns.small_.size()));
1073 current_block =
new block_type;
1074 std::copy(sruns.small_.begin(), sruns.small_.end(), current_block->begin());
1075 current_value = current_block->elem[0];
1081 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1085 current_block =
new block_type;
1087 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
1089 nruns = sruns.runs.size();
1095 STXXL_ERRMSG(
"The implementation of sort requires more than one merge pass, therefore for a better");
1096 STXXL_ERRMSG(
"efficiency decrease block size of run storage (a parameter of the run_creator)");
1097 STXXL_ERRMSG(
"or increase the amount memory dedicated to the merger.");
1098 STXXL_ERRMSG(
"m = " << m_ <<
" nruns=" << nruns);
1102 STXXL_ERRMSG(
"The merger requires memory to store at least two blocks internally. Aborting.");
1106 merge_recursively();
1108 nruns = sruns.runs.size();
1111 assert(nruns <= m_);
1118 unsigned_type prefetch_seq_size = 0;
1119 for (i = 0; i < nruns; i++)
1121 prefetch_seq_size += sruns.runs[i].size();
1124 consume_seq.resize(prefetch_seq_size);
1126 prefetch_seq =
new int_type[prefetch_seq_size];
1128 typename run_type::iterator copy_start = consume_seq.begin();
1129 for (i = 0; i < nruns; i++)
1131 copy_start = std::copy(
1132 sruns.runs[i].begin(),
1133 sruns.runs[i].end(),
1137 std::stable_sort(consume_seq.begin(), consume_seq.end(),
1138 sort_local::trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp));
1140 int_type disks_number = config::get_instance()->disks_number();
1142 const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (int_type(m_) - int_type(nruns)));
1145 #ifdef SORT_OPTIMAL_PREFETCHING
1147 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
1149 compute_prefetch_schedule(
1152 n_opt_prefetch_buffers,
1155 for (i = 0; i < prefetch_seq_size; ++i)
1156 prefetch_seq[i] = i;
1162 consume_seq.begin(),
1165 nruns + n_prefetch_buffers);
1171 initialize_current_block();
1172 fill_current_block();
1175 current_value = current_block->elem[0];
1178 if (elements_remaining <= block_type::size)
1179 deallocate_prefetcher();
1185 return elements_remaining == 0;
1192 --elements_remaining;
1194 if (buffer_pos != block_type::size)
1196 current_value = current_block->elem[buffer_pos];
1203 fill_current_block();
1205 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1206 assert(stxxl::is_sorted(current_block->elem, current_block->elem + current_block->size, cmp));
1207 assert(!cmp(current_block->elem[0], current_value));
1209 current_value = current_block->elem[0];
1212 if (elements_remaining <= block_type::size)
1213 deallocate_prefetcher();
1217 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1220 assert(!cmp(current_value, last_element));
1221 last_element = current_value;
1231 return current_value;
1238 return ¤t_value;
1246 deallocate_prefetcher();
1249 delete current_block;
1252 sruns.deallocate_blocks();
1257 value_type current_value;
1261 template <
class RunsType_,
class Cmp_,
class AllocStr_>
1262 void runs_merger<RunsType_, Cmp_, AllocStr_>::merge_recursively()
1265 unsigned_type ndisks = config::get_instance()->disks_number();
1266 unsigned_type nwrite_buffers = 2 * ndisks;
1268 unsigned_type nruns = sruns.runs.size();
1269 const unsigned_type merge_factor =
1270 static_cast<unsigned_type
>(ceil(pow(nruns, 1. / ceil(log(
double(nruns)) / log(
double(m_))))));
1271 assert(merge_factor <= m_);
1274 unsigned_type new_nruns = div_and_round_up(nruns, merge_factor);
1275 STXXL_VERBOSE(
"Starting new merge phase: nruns: " << nruns <<
1276 " opt_merge_factor: " << merge_factor <<
" m:" << m_ <<
" new_nruns: " << new_nruns);
1278 sorted_runs_type new_runs;
1279 new_runs.runs.resize(new_nruns);
1280 new_runs.runs_sizes.resize(new_nruns);
1281 new_runs.elements = sruns.elements;
1283 unsigned_type runs_left = nruns;
1284 unsigned_type cur_out_run = 0;
1285 unsigned_type elements_in_new_run = 0;
1289 while (runs_left > 0)
1291 int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1293 elements_in_new_run = 0;
1294 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); ++i)
1296 elements_in_new_run += sruns.runs_sizes[i];
1299 const unsigned_type blocks_in_new_run1 = div_and_round_up(elements_in_new_run, block_type::size);
1302 new_runs.runs_sizes[cur_out_run] = elements_in_new_run;
1304 new_runs.runs[cur_out_run++].resize(blocks_in_new_run1);
1305 runs_left -= runs2merge;
1309 for (unsigned_type i = 0; i < new_runs.runs.size(); ++i)
1311 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].begin()),
1312 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(new_runs.runs[i].end()));
1318 size_type elements_left = sruns.elements;
1320 while (runs_left > 0)
1322 unsigned_type runs2merge = STXXL_MIN(runs_left, merge_factor);
1323 STXXL_VERBOSE(
"Merging " << runs2merge <<
" runs");
1325 sorted_runs_type cur_runs;
1326 cur_runs.runs.resize(runs2merge);
1327 cur_runs.runs_sizes.resize(runs2merge);
1329 std::copy(sruns.runs.begin() + nruns - runs_left,
1330 sruns.runs.begin() + nruns - runs_left + runs2merge,
1331 cur_runs.runs.begin());
1332 std::copy(sruns.runs_sizes.begin() + nruns - runs_left,
1333 sruns.runs_sizes.begin() + nruns - runs_left + runs2merge,
1334 cur_runs.runs_sizes.begin());
1336 runs_left -= runs2merge;
1342 cur_runs.elements = new_runs.runs_sizes[cur_out_run];
1343 elements_left -= cur_runs.elements;
1347 runs_merger<RunsType_, Cmp_, AllocStr_> merger(cur_runs, cmp, m_ * block_type::raw_size * sort_memory_usage_factor());
1351 new_runs.runs[cur_out_run].begin(),
1355 const size_type cnt_max = cur_runs.elements;
1357 while (cnt != cnt_max)
1360 if ((cnt % block_type::size) == 0)
1361 new_runs.runs[cur_out_run][cnt / size_type(block_type::size)].value = *merger;
1368 assert(merger.empty());
1370 while (cnt % block_type::size)
1372 *out = cmp.max_value();
1381 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
1382 new_runs.runs.back().begin()),
1383 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(
1384 new_runs.runs.back().end()));
1386 assert(cur_runs.runs.size() == 1);
1388 std::copy(cur_runs.runs.front().begin(),
1389 cur_runs.runs.front().end(),
1390 new_runs.runs.back().begin());
1391 new_runs.runs_sizes.back() = cur_runs.runs_sizes.back();
1396 assert(elements_left == 0);
1412 template <
class Input_,
1414 unsigned BlockSize_ = STXXL_DEFAULT_BLOCK_SIZE(
typename Input_::value_type),
1415 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY>
1419 typedef typename runs_creator_type::sorted_runs_type sorted_runs_type;
1433 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use) :
1434 creator(in, c, memory_to_use),
1435 merger(creator.result(), c, memory_to_use)
1443 sort(Input_ & in, Cmp_ c, unsigned_type memory_to_use_rc, unsigned_type memory_to_use_m) :
1444 creator(in, c, memory_to_use_rc),
1445 merger(creator.result(), c, memory_to_use_m)
1456 const value_type * operator -> ()
const
1459 return merger.operator -> ();
1472 return merger.
empty();
1483 unsigned BlockSize_>
1486 typedef ValueType_ value_type;
1488 typedef sort_local::trigger_entry<bid_type, value_type> trigger_entry_type;
1491 typedef sorted_runs<value_type, trigger_entry_type> result;
1510 template <
unsigned BlockSize,
1511 class RandomAccessIterator,
1514 void sort(RandomAccessIterator begin,
1515 RandomAccessIterator end,
1517 unsigned_type MemSize,
1522 typedef typename streamify_traits<RandomAccessIterator>::stream_type InputType;
1526 InputType Input(begin, end);
1528 sorter_type Sort(Input, cmp, MemSize);
1534 __STXXL_END_NAMESPACE
1536 #endif // !STXXL_SORT_STREAM_HEADER