Stxxl 1.2.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/algo/sort.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2003 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2006 Johannes Singler <singler@ira.uka.de> 00008 * Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00009 * 00010 * Distributed under the Boost Software License, Version 1.0. 00011 * (See accompanying file LICENSE_1_0.txt or copy at 00012 * http://www.boost.org/LICENSE_1_0.txt) 00013 **************************************************************************/ 00014 00015 #ifndef STXXL_SORT_HEADER 00016 #define STXXL_SORT_HEADER 00017 00018 #include <list> 00019 #include <functional> 00020 00021 #include <stxxl/bits/mng/mng.h> 00022 #include <stxxl/bits/common/rand.h> 00023 #include <stxxl/bits/mng/adaptor.h> 00024 #include <stxxl/bits/common/simple_vector.h> 00025 #include <stxxl/bits/common/switch.h> 00026 #include <stxxl/bits/common/settings.h> 00027 #include <stxxl/bits/mng/block_alloc_interleaved.h> 00028 #include <stxxl/bits/algo/intksort.h> 00029 #include <stxxl/bits/algo/adaptor.h> 00030 #include <stxxl/bits/algo/async_schedule.h> 00031 #include <stxxl/bits/mng/block_prefetcher.h> 00032 #include <stxxl/bits/mng/buf_writer.h> 00033 #include <stxxl/bits/algo/run_cursor.h> 00034 #include <stxxl/bits/algo/losertree.h> 00035 #include <stxxl/bits/algo/inmemsort.h> 00036 #include <stxxl/bits/parallel.h> 00037 00038 00039 //#define SORT_OPTIMAL_PREFETCHING 00040 //#define INTERLEAVED_ALLOC 00041 //#define STXXL_CHECK_ORDER_IN_SORTS 00042 00043 __STXXL_BEGIN_NAMESPACE 00044 00047 00048 00051 namespace sort_local 00052 { 00053 template <typename BIDTp_, typename ValTp_> 00054 struct trigger_entry 00055 { 00056 typedef BIDTp_ bid_type; 00057 typedef ValTp_ value_type; 00058 00059 bid_type bid; 00060 value_type value; 00061 00062 operator bid_type () 00063 { 00064 return bid; 00065 } 00066 }; 00067 00068 template <typename BIDTp_, typename ValTp_, typename ValueCmp_> 00069 struct trigger_entry_cmp : public std::binary_function<trigger_entry<BIDTp_, ValTp_>, trigger_entry<BIDTp_, ValTp_>, bool> 00070 { 00071 typedef trigger_entry<BIDTp_, ValTp_> trigger_entry_type; 00072 ValueCmp_ cmp; 00073 trigger_entry_cmp(ValueCmp_ c) : cmp(c) { } 00074 trigger_entry_cmp(const trigger_entry_cmp & a) : cmp(a.cmp) { } 00075 bool operator () (const trigger_entry_type & a, const trigger_entry_type & b) const 00076 { 00077 return cmp(a.value, b.value); 00078 } 00079 }; 00080 00081 template <typename block_type, 00082 typename prefetcher_type, 00083 typename value_cmp> 00084 struct run_cursor2_cmp 00085 { 00086 typedef run_cursor2<block_type, prefetcher_type> cursor_type; 00087 value_cmp cmp; 00088 00089 run_cursor2_cmp(value_cmp c) : cmp(c) { } 00090 run_cursor2_cmp(const run_cursor2_cmp & a) : cmp(a.cmp) { } 00091 inline bool operator () (const cursor_type & a, const cursor_type & b) 00092 { 00093 if (UNLIKELY(b.empty())) 00094 return true; 00095 // sentinel emulation 00096 if (UNLIKELY(a.empty())) 00097 return false; 00098 //sentinel emulation 00099 00100 return (cmp(a.current(), b.current())); 00101 } 00102 }; 00103 00104 template <typename block_type, typename bid_type> 00105 struct read_next_after_write_completed 00106 { 00107 block_type * block; 00108 bid_type bid; 00109 request_ptr * req; 00110 void operator () (request * /*completed_req*/) 00111 { 00112 * req = block->read(bid); 00113 } 00114 }; 00115 00116 00117 template < 00118 typename block_type, 00119 typename run_type, 00120 typename input_bid_iterator, 00121 typename value_cmp> 00122 void 00123 create_runs( 00124 input_bid_iterator it, 00125 run_type ** runs, 00126 int_type nruns, 00127 int_type _m, 00128 value_cmp cmp) 00129 { 00130 typedef typename block_type::value_type type; 00131 typedef typename block_type::bid_type bid_type; 00132 STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m); 00133 00134 int_type m2 = _m / 2; 00135 block_manager * bm = block_manager::get_instance(); 00136 block_type * Blocks1 = new block_type[m2]; 00137 block_type * Blocks2 = new block_type[m2]; 00138 bid_type * bids1 = new bid_type[m2]; 00139 bid_type * bids2 = new bid_type[m2]; 00140 request_ptr * read_reqs1 = new request_ptr[m2]; 00141 request_ptr * read_reqs2 = new request_ptr[m2]; 00142 request_ptr * write_reqs = new request_ptr[m2]; 00143 read_next_after_write_completed<block_type, bid_type> * next_run_reads = 00144 new read_next_after_write_completed<block_type, bid_type>[m2]; 00145 00146 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00147 00148 int_type i; 00149 int_type run_size = 0, next_run_size = 0; 00150 00151 assert(nruns >= 2); 00152 00153 run_size = runs[0]->size(); 00154 assert(run_size == m2); 00155 00156 for (i = 0; i < run_size; ++i) 00157 { 00158 STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem)); 00159 bids1[i] = *(it++); 00160 read_reqs1[i] = Blocks1[i].read(bids1[i]); 00161 } 00162 00163 run_size = runs[1]->size(); 00164 00165 for (i = 0; i < run_size; ++i) 00166 { 00167 STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem)); 00168 bids2[i] = *(it++); 00169 read_reqs2[i] = Blocks2[i].read(bids2[i]); 00170 } 00171 00172 for (int_type k = 0; k < nruns - 1; ++k) 00173 { 00174 run_type * run = runs[k]; 00175 run_size = run->size(); 00176 assert(run_size == m2); 00177 next_run_size = runs[k + 1]->size(); 00178 assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2)); 00179 00180 STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1"); 00181 wait_all(read_reqs1, run_size); 00182 STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1"); 00183 for (i = 0; i < run_size; ++i) 00184 bm->delete_block(bids1[i]); 00185 00186 if (block_type::has_filler) 00187 std::sort( 00188 ArrayOfSequencesIterator< 00189 block_type, typename block_type::value_type, block_type::size 00190 >(Blocks1, 0), 00191 ArrayOfSequencesIterator< 00192 block_type, typename block_type::value_type, block_type::size 00193 >(Blocks1, run_size * block_type::size), 00194 cmp); 00195 else 00196 std::sort(Blocks1[0].elem, Blocks1[run_size].elem, cmp); 00197 00198 00199 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00200 if (k > 0) 00201 wait_all(write_reqs, m2); 00202 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00203 00204 int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0; 00205 for (i = 0; i < m2; ++i) 00206 { 00207 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem)); 00208 (*run)[i].value = Blocks1[i][0]; 00209 if (i >= runplus2size) { 00210 write_reqs[i] = Blocks1[i].write((*run)[i].bid); 00211 } 00212 else 00213 { 00214 next_run_reads[i].block = Blocks1 + i; 00215 next_run_reads[i].req = read_reqs1 + i; 00216 bids1[i] = next_run_reads[i].bid = *(it++); 00217 write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]); 00218 } 00219 } 00220 std::swap(Blocks1, Blocks2); 00221 std::swap(bids1, bids2); 00222 std::swap(read_reqs1, read_reqs2); 00223 } 00224 00225 run_type * run = runs[nruns - 1]; 00226 run_size = run->size(); 00227 STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1"); 00228 wait_all(read_reqs1, run_size); 00229 STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1"); 00230 for (i = 0; i < run_size; ++i) 00231 bm->delete_block(bids1[i]); 00232 00233 if (block_type::has_filler) { 00234 std::sort( 00235 ArrayOfSequencesIterator< 00236 block_type, typename block_type::value_type, block_type::size 00237 >(Blocks1, 0), 00238 ArrayOfSequencesIterator< 00239 block_type, typename block_type::value_type, block_type::size 00240 >(Blocks1, run_size * block_type::size), 00241 cmp); 00242 } else { 00243 std::sort(Blocks1[0].elem, Blocks1[run_size].elem, cmp); 00244 } 00245 00246 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00247 wait_all(write_reqs, m2); 00248 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00249 00250 for (i = 0; i < run_size; ++i) 00251 { 00252 STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem)); 00253 (*run)[i].value = Blocks1[i][0]; 00254 write_reqs[i] = Blocks1[i].write((*run)[i].bid); 00255 } 00256 00257 STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs"); 00258 wait_all(write_reqs, run_size); 00259 STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs"); 00260 00261 delete[] Blocks1; 00262 delete[] Blocks2; 00263 delete[] bids1; 00264 delete[] bids2; 00265 delete[] read_reqs1; 00266 delete[] read_reqs2; 00267 delete[] write_reqs; 00268 delete[] next_run_reads; 00269 } 00270 00271 00272 template <typename block_type, typename run_type, typename value_cmp> 00273 bool check_sorted_runs(run_type ** runs, 00274 unsigned_type nruns, 00275 unsigned_type m, 00276 value_cmp cmp) 00277 { 00278 typedef typename block_type::value_type value_type; 00279 00280 //STXXL_VERBOSE1("check_sorted_runs Runs: "<<nruns); 00281 STXXL_MSG("check_sorted_runs Runs: " << nruns); 00282 unsigned_type irun = 0; 00283 for (irun = 0; irun < nruns; ++irun) 00284 { 00285 const unsigned_type nblocks_per_run = runs[irun]->size(); 00286 unsigned_type blocks_left = nblocks_per_run; 00287 block_type * blocks = new block_type[m]; 00288 request_ptr * reqs = new request_ptr[m]; 00289 value_type last = cmp.min_value(); 00290 00291 for (unsigned_type off = 0; off < nblocks_per_run; off += m) 00292 { 00293 const unsigned_type nblocks = STXXL_MIN(blocks_left, m); 00294 const unsigned_type nelements = nblocks * block_type::size; 00295 blocks_left -= nblocks; 00296 00297 for (unsigned_type j = 0; j < nblocks; ++j) 00298 { 00299 reqs[j] = blocks[j].read((*runs[irun])[j + off].bid); 00300 } 00301 wait_all(reqs, reqs + nblocks); 00302 00303 if (off && cmp(blocks[0][0], last)) 00304 { 00305 STXXL_MSG("check_sorted_runs wrong first value in the run " << irun); 00306 STXXL_MSG(" first value: " << blocks[0][0]); 00307 STXXL_MSG(" last value: " << last); 00308 for (unsigned_type k = 0; k < block_type::size; ++k) 00309 STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]); 00310 00311 return false; 00312 } 00313 00314 for (unsigned_type j = 0; j < nblocks; ++j) 00315 { 00316 if (!(blocks[j][0] == (*runs[irun])[j + off].value)) 00317 { 00318 STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (j + off)); 00319 STXXL_MSG(" trigger value: " << (*runs[irun])[j + off].value); 00320 STXXL_MSG("Data in the block:"); 00321 for (unsigned_type k = 0; k < block_type::size; ++k) 00322 STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]); 00323 00324 STXXL_MSG("BIDS:"); 00325 for (unsigned_type k = 0; k < nblocks; ++k) 00326 { 00327 if (k == j) 00328 STXXL_MSG("Bad one comes next."); 00329 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid)); 00330 } 00331 00332 return false; 00333 } 00334 } 00335 if (!stxxl::is_sorted( 00336 ArrayOfSequencesIterator< 00337 block_type, typename block_type::value_type, block_type::size 00338 >(blocks, 0), 00339 ArrayOfSequencesIterator< 00340 block_type, typename block_type::value_type, block_type::size 00341 >(blocks, nelements), 00342 cmp)) 00343 { 00344 STXXL_MSG("check_sorted_runs wrong order in the run " << irun); 00345 STXXL_MSG("Data in blocks:"); 00346 for (unsigned_type j = 0; j < nblocks; ++j) 00347 { 00348 for (unsigned_type k = 0; k < block_type::size; ++k) 00349 STXXL_MSG(" Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]); 00350 } 00351 STXXL_MSG("BIDS:"); 00352 for (unsigned_type k = 0; k < nblocks; ++k) 00353 { 00354 STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid)); 00355 } 00356 00357 return false; 00358 } 00359 00360 last = blocks[nblocks - 1][block_type::size - 1]; 00361 } 00362 00363 assert(blocks_left == 0); 00364 delete[] reqs; 00365 delete[] blocks; 00366 } 00367 00368 return true; 00369 } 00370 00371 00372 template <typename block_type, typename run_type, typename value_cmp> 00373 void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp 00374 ) 00375 { 00376 typedef typename block_type::bid_type bid_type; 00377 typedef typename block_type::value_type value_type; 00378 typedef block_prefetcher<block_type, typename run_type::iterator> prefetcher_type; 00379 typedef run_cursor2<block_type, prefetcher_type> run_cursor_type; 00380 typedef run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type; 00381 00382 int_type i; 00383 run_type consume_seq(out_run->size()); 00384 00385 int_type * prefetch_seq = new int_type[out_run->size()]; 00386 00387 typename run_type::iterator copy_start = consume_seq.begin(); 00388 for (i = 0; i < nruns; i++) 00389 { 00390 // TODO: try to avoid copy 00391 copy_start = std::copy( 00392 in_runs[i]->begin(), 00393 in_runs[i]->end(), 00394 copy_start); 00395 } 00396 00397 std::stable_sort(consume_seq.begin(), consume_seq.end(), 00398 trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp)); 00399 00400 int_type disks_number = config::get_instance()->disks_number(); 00401 00402 #ifdef PLAY_WITH_OPT_PREF 00403 const int_type n_write_buffers = 4 * disks_number; 00404 #else 00405 const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4)); 00406 const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers); 00407 #ifdef SORT_OPTIMAL_PREFETCHING 00408 // heuristic 00409 const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10; 00410 #endif 00411 #endif 00412 00413 #ifdef SORT_OPTIMAL_PREFETCHING 00414 compute_prefetch_schedule( 00415 consume_seq, 00416 prefetch_seq, 00417 n_opt_prefetch_buffers, 00418 disks_number); 00419 #else 00420 for (i = 0; i < out_run->size(); i++) 00421 prefetch_seq[i] = i; 00422 00423 #endif 00424 00425 prefetcher_type prefetcher(consume_seq.begin(), 00426 consume_seq.end(), 00427 prefetch_seq, 00428 nruns + n_prefetch_buffers); 00429 00430 buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2); 00431 00432 int_type out_run_size = out_run->size(); 00433 00434 block_type * out_buffer = writer.get_free_block(); 00435 00436 //If parallelism is activated, one can still fall back to the 00437 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway. 00438 00439 if (do_parallel_merge()) 00440 { 00441 #if STXXL_PARALLEL_MULTIWAY_MERGE 00442 00443 // begin of STL-style merging 00444 00445 typedef stxxl::int64 diff_type; 00446 typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence; 00447 typedef typename std::vector<sequence>::size_type seqs_size_type; 00448 std::vector<sequence> seqs(nruns); 00449 std::vector<block_type *> buffers(nruns); 00450 00451 for (int_type i = 0; i < nruns; i++) //initialize sequences 00452 { 00453 buffers[i] = prefetcher.pull_block(); //get first block of each run 00454 seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end()); 00455 //this memory location stays the same, only the data is exchanged 00456 } 00457 00458 #ifdef STXXL_CHECK_ORDER_IN_SORTS 00459 value_type last_elem = cmp.min_value(); 00460 #endif 00461 00462 for (int_type j = 0; j < out_run_size; ++j) //for the whole output run, out_run_size is in blocks 00463 { 00464 diff_type rest = block_type::size; //elements still to merge for this output block 00465 00466 STXXL_VERBOSE1("output block " << j); 00467 do { 00468 value_type * min_last_element = NULL; //no element found yet 00469 diff_type total_size = 0; 00470 00471 for (seqs_size_type i = 0; i < seqs.size(); i++) 00472 { 00473 if (seqs[i].first == seqs[i].second) 00474 continue; //run empty 00475 00476 if (min_last_element == NULL) 00477 min_last_element = &(*(seqs[i].second - 1)); 00478 else 00479 min_last_element = cmp(*min_last_element, *(seqs[i].second - 1)) ? min_last_element : &(*(seqs[i].second - 1)); 00480 00481 total_size += seqs[i].second - seqs[i].first; 00482 STXXL_VERBOSE1("last " << *(seqs[i].second - 1) << " block size " << (seqs[i].second - seqs[i].first)); 00483 } 00484 00485 assert(min_last_element != NULL); //there must be some element 00486 00487 STXXL_VERBOSE1("min_last_element " << min_last_element << " total size " << total_size + (block_type::size - rest)); 00488 00489 diff_type less_equal_than_min_last = 0; 00490 //locate this element in all sequences 00491 for (seqs_size_type i = 0; i < seqs.size(); i++) 00492 { 00493 if (seqs[i].first == seqs[i].second) 00494 continue; //empty subsequence 00495 00496 typename block_type::iterator position = std::upper_bound(seqs[i].first, seqs[i].second, *min_last_element, cmp); 00497 STXXL_VERBOSE1("greater equal than " << position - seqs[i].first); 00498 less_equal_than_min_last += position - seqs[i].first; 00499 } 00500 00501 STXXL_VERBOSE1("finished loop"); 00502 00503 ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest); //at most rest elements 00504 00505 STXXL_VERBOSE1("before merge" << output_size); 00506 00507 stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size); 00508 //sequence iterators are progressed appropriately 00509 00510 STXXL_VERBOSE1("after merge"); 00511 00512 (*out_run)[j].value = (*out_buffer)[0]; //save smallest value 00513 00514 rest -= output_size; 00515 00516 STXXL_VERBOSE1("so long"); 00517 00518 for (seqs_size_type i = 0; i < seqs.size(); i++) 00519 { 00520 if (seqs[i].first == seqs[i].second) //run empty 00521 { 00522 if (prefetcher.block_consumed(buffers[i])) 00523 { 00524 seqs[i].first = buffers[i]->begin(); //reset iterator 00525 seqs[i].second = buffers[i]->end(); 00526 STXXL_VERBOSE1("block ran empty " << i); 00527 } 00528 else 00529 { 00530 seqs.erase(seqs.begin() + i); //remove this sequence 00531 buffers.erase(buffers.begin() + i); 00532 STXXL_VERBOSE1("seq removed " << i); 00533 } 00534 } 00535 } 00536 } while (rest > 0 && seqs.size() > 0); 00537 00538 #ifdef STXXL_CHECK_ORDER_IN_SORTS 00539 if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp)) 00540 { 00541 for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++) 00542 if (cmp(*i, *(i - 1))) 00543 { 00544 STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin())); 00545 } 00546 assert(false); 00547 } 00548 00549 if (j > 0) //do not check in first iteration 00550 assert(cmp((*out_buffer)[0], last_elem) == false); 00551 00552 last_elem = (*out_buffer)[block_type::size - 1]; 00553 #endif 00554 00555 00556 out_buffer = writer.write(out_buffer, (*out_run)[j].bid); 00557 } 00558 00559 // end of STL-style merging 00560 00561 #else 00562 assert(false); 00563 #endif 00564 } 00565 else 00566 { 00567 // begin of native merging procedure 00568 00569 loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size> 00570 losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp)); 00571 00572 #ifdef STXXL_CHECK_ORDER_IN_SORTS 00573 value_type last_elem = cmp.min_value(); 00574 #endif 00575 00576 for (i = 0; i < out_run_size; ++i) 00577 { 00578 losers.multi_merge(out_buffer->elem); 00579 (*out_run)[i].value = *(out_buffer->elem); 00580 00581 #ifdef STXXL_CHECK_ORDER_IN_SORTS 00582 assert(stxxl::is_sorted( 00583 out_buffer->begin(), 00584 out_buffer->end(), 00585 cmp)); 00586 00587 if (i) 00588 assert(cmp(*(out_buffer->elem), last_elem) == false); 00589 00590 last_elem = (*out_buffer).elem[block_type::size - 1]; 00591 #endif 00592 00593 out_buffer = writer.write(out_buffer, (*out_run)[i].bid); 00594 } 00595 00596 // end of native merging procedure 00597 } 00598 00599 delete[] prefetch_seq; 00600 00601 block_manager * bm = block_manager::get_instance(); 00602 for (i = 0; i < nruns; ++i) 00603 { 00604 unsigned_type sz = in_runs[i]->size(); 00605 for (unsigned_type j = 0; j < sz; ++j) 00606 bm->delete_block((*in_runs[i])[j].bid); 00607 00608 00609 delete in_runs[i]; 00610 } 00611 } 00612 00613 00614 template <typename block_type, 00615 typename alloc_strategy, 00616 typename input_bid_iterator, 00617 typename value_cmp> 00618 simple_vector<trigger_entry<typename block_type::bid_type, typename block_type::value_type> > * 00619 sort_blocks(input_bid_iterator input_bids, 00620 unsigned_type _n, 00621 unsigned_type _m, 00622 value_cmp cmp 00623 ) 00624 { 00625 typedef typename block_type::value_type type; 00626 typedef typename block_type::bid_type bid_type; 00627 typedef trigger_entry<bid_type, type> trigger_entry_type; 00628 typedef simple_vector<trigger_entry_type> run_type; 00629 typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy; 00630 00631 unsigned_type m2 = _m / 2; 00632 unsigned_type full_runs = _n / m2; 00633 unsigned_type partial_runs = ((_n % m2) ? 1 : 0); 00634 unsigned_type nruns = full_runs + partial_runs; 00635 unsigned_type i; 00636 00637 config * cfg = config::get_instance(); 00638 block_manager * mng = block_manager::get_instance(); 00639 const unsigned_type ndisks = cfg->disks_number(); 00640 00641 //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs); 00642 00643 double begin = timestamp(), after_runs_creation, end; 00644 00645 run_type ** runs = new run_type *[nruns]; 00646 00647 for (i = 0; i < full_runs; i++) 00648 runs[i] = new run_type(m2); 00649 00650 00651 if (partial_runs) 00652 runs[i] = new run_type(_n - full_runs * m2); 00653 00654 00655 for (i = 0; i < nruns; ++i) 00656 { 00657 // FIXME: why has an alloc_strategy to take two arguments disk_index.begin(), disk_index.end() ??? 00658 mng->new_blocks(alloc_strategy(0, ndisks), 00659 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->begin()), 00660 trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->end())); 00661 } 00662 00663 sort_local::create_runs<block_type, 00664 run_type, 00665 input_bid_iterator, 00666 value_cmp>(input_bids, runs, nruns, _m, cmp); 00667 00668 after_runs_creation = timestamp(); 00669 00670 double io_wait_after_rf = stats::get_instance()->get_io_wait_time(); 00671 00672 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00673 00674 // Optimal merging: merge r = pow(nruns,1/ceil(log(nruns)/log(m))) runs at once 00675 00676 const int_type merge_factor = static_cast<int_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / 00677 log(double(_m)))))); 00678 run_type ** new_runs; 00679 00680 while (nruns > 1) 00681 { 00682 int_type new_nruns = div_and_round_up(nruns, merge_factor); 00683 STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns << 00684 " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns); 00685 00686 new_runs = new run_type *[new_nruns]; 00687 00688 int_type runs_left = nruns; 00689 int_type cur_out_run = 0; 00690 int_type blocks_in_new_run = 0; 00691 00692 while (runs_left > 0) 00693 { 00694 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 00695 blocks_in_new_run = 0; 00696 for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++) 00697 blocks_in_new_run += runs[i]->size(); 00698 00699 // allocate run 00700 new_runs[cur_out_run++] = new run_type(blocks_in_new_run); 00701 runs_left -= runs2merge; 00702 } 00703 // allocate blocks for the new runs 00704 if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && (input_bids->storage->get_id() == -1)) 00705 { 00706 // if we sort a file we can reuse the input bids for the output 00707 input_bid_iterator cur = input_bids; 00708 for (int_type i = 0; cur != (input_bids + _n); ++cur) 00709 { 00710 (*new_runs[0])[i++].bid = *cur; 00711 } 00712 00713 bid_type & firstBID = (*new_runs[0])[0].bid; 00714 if (firstBID.storage->get_id() != -1) 00715 { 00716 // the first block does not belong to the file 00717 // need to reallocate it 00718 mng->new_blocks(FR(), &firstBID, (&firstBID) + 1); 00719 } 00720 bid_type & lastBID = (*new_runs[0])[_n - 1].bid; 00721 if (lastBID.storage->get_id() != -1) 00722 { 00723 // the first block does not belong to the file 00724 // need to reallocate it 00725 mng->new_blocks(FR(), &lastBID, (&lastBID) + 1); 00726 } 00727 } 00728 else 00729 { 00730 mng->new_blocks(interleaved_alloc_strategy(new_nruns, 0, ndisks), 00731 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run), 00732 RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run)); 00733 } 00734 // merge all 00735 runs_left = nruns; 00736 cur_out_run = 0; 00737 while (runs_left > 0) 00738 { 00739 int_type runs2merge = STXXL_MIN(runs_left, merge_factor); 00740 #ifdef STXXL_CHECK_ORDER_IN_SORTS 00741 assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp))); 00742 #endif 00743 STXXL_VERBOSE("Merging " << runs2merge << " runs"); 00744 merge_runs<block_type, run_type>(runs + nruns - runs_left, 00745 runs2merge, *(new_runs + (cur_out_run++)), _m, cmp 00746 ); 00747 runs_left -= runs2merge; 00748 } 00749 00750 nruns = new_nruns; 00751 delete[] runs; 00752 runs = new_runs; 00753 } 00754 00755 run_type * result = *runs; 00756 delete[] runs; 00757 00758 end = timestamp(); 00759 00760 STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " << 00761 after_runs_creation - begin << " s"); 00762 STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s"); 00763 STXXL_VERBOSE(*stats::get_instance()); 00764 UNUSED(begin + io_wait_after_rf); 00765 00766 return result; 00767 } 00768 } 00769 00770 00778 template <typename ExtIterator_, typename StrictWeakOrdering_> 00779 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M) 00780 { 00781 typedef simple_vector<sort_local::trigger_entry<typename ExtIterator_::bid_type, 00782 typename ExtIterator_::vector_type::value_type> > run_type; 00783 00784 typedef typename ExtIterator_::vector_type::value_type value_type; 00785 typedef typename ExtIterator_::block_type block_type; 00786 00787 // verify strict weak ordering of the sentinels 00788 assert(!cmp(cmp.min_value(), cmp.min_value())); 00789 assert(cmp(cmp.min_value(), cmp.max_value())); 00790 assert(!cmp(cmp.max_value(), cmp.max_value())); 00791 00792 unsigned_type n = 0; 00793 00794 block_manager * mng = block_manager::get_instance(); 00795 00796 first.flush(); 00797 00798 if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M) 00799 { 00800 stl_in_memory_sort(first, last, cmp); 00801 } 00802 else 00803 { 00804 assert(2 * block_type::raw_size * sort_memory_usage_factor() <= M); 00805 00806 if (first.block_offset()) 00807 { 00808 if (last.block_offset()) // first and last element are 00809 // not the first elements of their block 00810 { 00811 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type; 00812 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type; 00813 typename ExtIterator_::bid_type first_bid, last_bid; 00814 request_ptr req; 00815 00816 req = first_block->read(*first.bid()); 00817 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1); // try to overlap 00818 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1); 00819 req->wait(); 00820 00821 00822 req = last_block->read(*last.bid()); 00823 00824 unsigned_type i = 0; 00825 for ( ; i < first.block_offset(); ++i) 00826 { 00827 first_block->elem[i] = cmp.min_value(); 00828 } 00829 00830 req->wait(); 00831 00832 00833 req = first_block->write(first_bid); 00834 for (i = last.block_offset(); i < block_type::size; ++i) 00835 { 00836 last_block->elem[i] = cmp.max_value(); 00837 } 00838 00839 req->wait(); 00840 00841 00842 req = last_block->write(last_bid); 00843 00844 n = last.bid() - first.bid() + 1; 00845 00846 std::swap(first_bid, *first.bid()); 00847 std::swap(last_bid, *last.bid()); 00848 00849 req->wait(); 00850 00851 00852 delete first_block; 00853 delete last_block; 00854 00855 run_type * out = 00856 sort_local::sort_blocks< 00857 typename ExtIterator_::block_type, 00858 typename ExtIterator_::vector_type::alloc_strategy, 00859 typename ExtIterator_::bids_container_iterator> 00860 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 00861 00862 00863 first_block = new typename ExtIterator_::block_type; 00864 last_block = new typename ExtIterator_::block_type; 00865 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type; 00866 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type; 00867 request_ptr * reqs = new request_ptr[2]; 00868 00869 reqs[0] = first_block->read(first_bid); 00870 reqs[1] = sorted_first_block->read((*(out->begin())).bid); 00871 00872 reqs[0]->wait(); 00873 reqs[1]->wait(); 00874 00875 reqs[0] = last_block->read(last_bid); 00876 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid); 00877 00878 for (i = first.block_offset(); i < block_type::size; i++) 00879 { 00880 first_block->elem[i] = sorted_first_block->elem[i]; 00881 } 00882 00883 reqs[0]->wait(); 00884 reqs[1]->wait(); 00885 00886 req = first_block->write(first_bid); 00887 00888 for (i = 0; i < last.block_offset(); ++i) 00889 { 00890 last_block->elem[i] = sorted_last_block->elem[i]; 00891 } 00892 00893 req->wait(); 00894 00895 req = last_block->write(last_bid); 00896 00897 mng->delete_block(out->begin()->bid); 00898 mng->delete_block((*out)[out->size() - 1].bid); 00899 00900 *first.bid() = first_bid; 00901 *last.bid() = last_bid; 00902 00903 typename run_type::iterator it = out->begin(); 00904 ++it; 00905 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 00906 ++cur_bid; 00907 00908 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 00909 { 00910 *cur_bid = (*it).bid; 00911 } 00912 00913 delete first_block; 00914 delete sorted_first_block; 00915 delete sorted_last_block; 00916 delete[] reqs; 00917 delete out; 00918 00919 req->wait(); 00920 00921 00922 delete last_block; 00923 } 00924 else 00925 { 00926 // first element is 00927 // not the first element of its block 00928 typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type; 00929 typename ExtIterator_::bid_type first_bid; 00930 request_ptr req; 00931 00932 req = first_block->read(*first.bid()); 00933 mng->new_blocks(FR(), &first_bid, (&first_bid) + 1); // try to overlap 00934 req->wait(); 00935 00936 00937 unsigned_type i = 0; 00938 for ( ; i < first.block_offset(); ++i) 00939 { 00940 first_block->elem[i] = cmp.min_value(); 00941 } 00942 00943 req = first_block->write(first_bid); 00944 00945 n = last.bid() - first.bid(); 00946 00947 std::swap(first_bid, *first.bid()); 00948 00949 req->wait(); 00950 00951 00952 delete first_block; 00953 00954 run_type * out = 00955 sort_local::sort_blocks< 00956 typename ExtIterator_::block_type, 00957 typename ExtIterator_::vector_type::alloc_strategy, 00958 typename ExtIterator_::bids_container_iterator> 00959 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 00960 00961 00962 first_block = new typename ExtIterator_::block_type; 00963 00964 typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type; 00965 00966 request_ptr * reqs = new request_ptr[2]; 00967 00968 reqs[0] = first_block->read(first_bid); 00969 reqs[1] = sorted_first_block->read((*(out->begin())).bid); 00970 00971 reqs[0]->wait(); 00972 reqs[1]->wait(); 00973 00974 for (i = first.block_offset(); i < block_type::size; ++i) 00975 { 00976 first_block->elem[i] = sorted_first_block->elem[i]; 00977 } 00978 00979 req = first_block->write(first_bid); 00980 00981 mng->delete_block(out->begin()->bid); 00982 00983 *first.bid() = first_bid; 00984 00985 typename run_type::iterator it = out->begin(); 00986 ++it; 00987 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 00988 ++cur_bid; 00989 00990 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 00991 { 00992 *cur_bid = (*it).bid; 00993 } 00994 00995 *cur_bid = (*it).bid; 00996 00997 delete sorted_first_block; 00998 delete[] reqs; 00999 delete out; 01000 01001 req->wait(); 01002 01003 delete first_block; 01004 } 01005 } 01006 else 01007 { 01008 if (last.block_offset()) // last is 01009 // not the first element of its block 01010 { 01011 typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type; 01012 typename ExtIterator_::bid_type last_bid; 01013 request_ptr req; 01014 unsigned_type i; 01015 01016 req = last_block->read(*last.bid()); 01017 mng->new_blocks(FR(), &last_bid, (&last_bid) + 1); 01018 req->wait(); 01019 01020 01021 for (i = last.block_offset(); i < block_type::size; ++i) 01022 { 01023 last_block->elem[i] = cmp.max_value(); 01024 } 01025 01026 req = last_block->write(last_bid); 01027 01028 n = last.bid() - first.bid() + 1; 01029 01030 std::swap(last_bid, *last.bid()); 01031 01032 req->wait(); 01033 01034 01035 delete last_block; 01036 01037 run_type * out = 01038 sort_local::sort_blocks< 01039 typename ExtIterator_::block_type, 01040 typename ExtIterator_::vector_type::alloc_strategy, 01041 typename ExtIterator_::bids_container_iterator> 01042 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 01043 01044 01045 last_block = new typename ExtIterator_::block_type; 01046 typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type; 01047 request_ptr * reqs = new request_ptr[2]; 01048 01049 reqs[0] = last_block->read(last_bid); 01050 reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid); 01051 01052 reqs[0]->wait(); 01053 reqs[1]->wait(); 01054 01055 for (i = 0; i < last.block_offset(); ++i) 01056 { 01057 last_block->elem[i] = sorted_last_block->elem[i]; 01058 } 01059 01060 req = last_block->write(last_bid); 01061 01062 mng->delete_block((*out)[out->size() - 1].bid); 01063 01064 *last.bid() = last_bid; 01065 01066 typename run_type::iterator it = out->begin(); 01067 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 01068 01069 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 01070 { 01071 *cur_bid = (*it).bid; 01072 } 01073 01074 delete sorted_last_block; 01075 delete[] reqs; 01076 delete out; 01077 01078 req->wait(); 01079 01080 delete last_block; 01081 } 01082 else 01083 { 01084 // first and last element are first elements of their of blocks 01085 n = last.bid() - first.bid(); 01086 01087 run_type * out = 01088 sort_local::sort_blocks<typename ExtIterator_::block_type, 01089 typename ExtIterator_::vector_type::alloc_strategy, 01090 typename ExtIterator_::bids_container_iterator> 01091 (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp); 01092 01093 typename run_type::iterator it = out->begin(); 01094 typename ExtIterator_::bids_container_iterator cur_bid = first.bid(); 01095 01096 for ( ; cur_bid != last.bid(); ++cur_bid, ++it) 01097 { 01098 *cur_bid = (*it).bid; 01099 } 01100 01101 delete out; 01102 } 01103 } 01104 } 01105 01106 #ifdef STXXL_CHECK_ORDER_IN_SORTS 01107 assert(stxxl::is_sorted(first, last, cmp)); 01108 #endif 01109 } 01110 01112 01113 __STXXL_END_NAMESPACE 01114 01115 #endif // !STXXL_SORT_HEADER 01116 // vim: et:ts=4:sw=4