Stxxl 1.2.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/mng/buf_writer.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002-2004 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * 00008 * Distributed under the Boost Software License, Version 1.0. 00009 * (See accompanying file LICENSE_1_0.txt or copy at 00010 * http://www.boost.org/LICENSE_1_0.txt) 00011 **************************************************************************/ 00012 00013 #ifndef STXXL_BUFFERED_WRITER_HEADER 00014 #define STXXL_BUFFERED_WRITER_HEADER 00015 00016 #include <vector> 00017 00018 #include <stxxl/bits/mng/mng.h> 00019 00020 00021 __STXXL_BEGIN_NAMESPACE 00022 00029 00030 00034 template <typename block_type> 00035 class buffered_writer 00036 { 00037 buffered_writer() { } 00038 00039 protected: 00040 typedef typename block_type::bid_type bid_type; 00041 00042 const unsigned_type nwriteblocks; 00043 block_type * write_buffers; 00044 bid_type * write_bids; 00045 request_ptr * write_reqs; 00046 const unsigned_type writebatchsize; 00047 00048 std::vector<int_type> free_write_blocks; // contains free write blocks 00049 std::vector<int_type> busy_write_blocks; // blocks that are in writing, notice that if block is not in free_ 00050 // an not in busy then block is not yet filled 00051 00052 struct batch_entry 00053 { 00054 stxxl::int64 offset; 00055 int_type ibuffer; 00056 batch_entry(stxxl::int64 o, int b) : offset(o), ibuffer(b) { } 00057 }; 00058 struct batch_entry_cmp 00059 { 00060 bool operator () (const batch_entry & a, const batch_entry & b) const 00061 { 00062 return (a.offset > b.offset); 00063 } 00064 }; 00065 00066 typedef std::priority_queue<batch_entry, std::vector<batch_entry>, batch_entry_cmp> batch_type; 00067 batch_type batch_write_blocks; // sorted sequence of blocks to write 00068 00069 public: 00074 buffered_writer(unsigned_type write_buf_size, unsigned_type write_batch_size) : 00075 nwriteblocks((write_buf_size > 2) ? write_buf_size : 2), 00076 writebatchsize(write_batch_size ? write_batch_size : 1) 00077 { 00078 write_buffers = new block_type[nwriteblocks]; 00079 write_reqs = new request_ptr[nwriteblocks]; 00080 write_bids = new bid_type[nwriteblocks]; 00081 00082 for (unsigned_type i = 0; i < nwriteblocks; i++) 00083 free_write_blocks.push_back(i); 00084 00085 00086 disk_queues::get_instance()->set_priority_op(disk_queue::WRITE); 00087 } 00090 block_type * get_free_block() 00091 { 00092 int_type ibuffer; 00093 for (std::vector<int_type>::iterator it = busy_write_blocks.begin(); 00094 it != busy_write_blocks.end(); ++it) 00095 { 00096 if (write_reqs[ibuffer = (*it)]->poll()) 00097 { 00098 busy_write_blocks.erase(it); 00099 free_write_blocks.push_back(ibuffer); 00100 00101 break; 00102 } 00103 } 00104 if (UNLIKELY(free_write_blocks.empty())) 00105 { 00106 int_type size = busy_write_blocks.size(); 00107 request_ptr * reqs = new request_ptr[size]; 00108 int_type i = 0; 00109 for ( ; i < size; ++i) 00110 { 00111 reqs[i] = write_reqs[busy_write_blocks[i]]; 00112 } 00113 int_type completed = wait_any(reqs, size); 00114 int_type completed_global = busy_write_blocks[completed]; 00115 delete[] reqs; 00116 busy_write_blocks.erase(busy_write_blocks.begin() + completed); 00117 00118 return (write_buffers + completed_global); 00119 } 00120 ibuffer = free_write_blocks.back(); 00121 free_write_blocks.pop_back(); 00122 00123 return (write_buffers + ibuffer); 00124 } 00130 block_type * write(block_type * filled_block, const bid_type & bid) // writes filled_block and returns a new block 00131 { 00132 if (batch_write_blocks.size() >= writebatchsize) 00133 { 00134 // flush batch 00135 while (!batch_write_blocks.empty()) 00136 { 00137 int_type ibuffer = batch_write_blocks.top().ibuffer; 00138 batch_write_blocks.pop(); 00139 00140 if (write_reqs[ibuffer].valid()) 00141 write_reqs[ibuffer]->wait(); 00142 00143 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]); 00144 00145 busy_write_blocks.push_back(ibuffer); 00146 } 00147 } 00148 // STXXL_MSG("Adding write request to batch"); 00149 00150 int_type ibuffer = filled_block - write_buffers; 00151 write_bids[ibuffer] = bid; 00152 batch_write_blocks.push(batch_entry(bid.offset, ibuffer)); 00153 00154 return get_free_block(); 00155 } 00157 void flush() 00158 { 00159 int_type ibuffer; 00160 while (!batch_write_blocks.empty()) 00161 { 00162 ibuffer = batch_write_blocks.top().ibuffer; 00163 batch_write_blocks.pop(); 00164 00165 if (write_reqs[ibuffer].valid()) 00166 write_reqs[ibuffer]->wait(); 00167 00168 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]); 00169 00170 busy_write_blocks.push_back(ibuffer); 00171 } 00172 for (std::vector<int_type>::const_iterator it = 00173 busy_write_blocks.begin(); 00174 it != busy_write_blocks.end(); it++) 00175 { 00176 ibuffer = *it; 00177 write_reqs[ibuffer]->wait(); 00178 } 00179 00180 assert(batch_write_blocks.empty()); 00181 free_write_blocks.clear(); 00182 busy_write_blocks.clear(); 00183 00184 for (unsigned_type i = 0; i < nwriteblocks; i++) 00185 free_write_blocks.push_back(i); 00186 } 00187 00189 virtual ~buffered_writer() 00190 { 00191 int_type ibuffer; 00192 while (!batch_write_blocks.empty()) 00193 { 00194 ibuffer = batch_write_blocks.top().ibuffer; 00195 batch_write_blocks.pop(); 00196 00197 if (write_reqs[ibuffer].valid()) 00198 write_reqs[ibuffer]->wait(); 00199 00200 write_reqs[ibuffer] = write_buffers[ibuffer].write(write_bids[ibuffer]); 00201 00202 busy_write_blocks.push_back(ibuffer); 00203 } 00204 for (std::vector<int_type>::const_iterator it = 00205 busy_write_blocks.begin(); 00206 it != busy_write_blocks.end(); it++) 00207 { 00208 ibuffer = *it; 00209 write_reqs[ibuffer]->wait(); 00210 } 00211 00212 delete[] write_reqs; 00213 delete[] write_buffers; 00214 delete[] write_bids; 00215 } 00216 }; 00217 00219 00220 __STXXL_END_NAMESPACE 00221 00222 #endif // !STXXL_BUFFERED_WRITER_HEADER