Stxxl 1.2.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/io/iobase.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de> 00008 * 00009 * Distributed under the Boost Software License, Version 1.0. 00010 * (See accompanying file LICENSE_1_0.txt or copy at 00011 * http://www.boost.org/LICENSE_1_0.txt) 00012 **************************************************************************/ 00013 00014 #ifndef STXXL_IOBASE_HEADER 00015 #define STXXL_IOBASE_HEADER 00016 00017 #ifdef STXXL_BOOST_CONFIG 00018 #include <boost/config.hpp> 00019 #endif 00020 00021 #if defined (__linux__) 00022 #define STXXL_CHECK_BLOCK_ALIGNING 00023 #endif 00024 00025 //#ifdef __sun__ 00026 //#define O_DIRECT 0 00027 //#endif 00028 00029 00030 #include <cstdlib> 00031 #include <cstdio> 00032 #include <cstring> 00033 #include <cerrno> 00034 00035 #include <fcntl.h> 00036 #include <sys/types.h> 00037 #include <sys/stat.h> 00038 00039 #include <iostream> 00040 #include <algorithm> 00041 #include <string> 00042 #include <queue> 00043 #include <map> 00044 #include <set> 00045 00046 #ifdef BOOST_MSVC 00047 // this is not stxxl/bits/io/io.h ! 00048 #include <io.h> 00049 #else 00050 #include <unistd.h> 00051 #include <sys/resource.h> 00052 #include <sys/wait.h> 00053 #endif 00054 00055 #ifdef STXXL_BOOST_THREADS // Use Portable Boost threads 00056 // Boost.Threads headers 00057 #include <boost/thread/thread.hpp> 00058 #include <boost/thread/mutex.hpp> 00059 #include <boost/bind.hpp> 00060 #else 00061 #include <pthread.h> 00062 #endif 00063 00064 00065 #ifndef O_SYNC 00066 #define O_SYNC 0 00067 #endif 00068 #ifndef O_RSYNC 00069 #define O_RSYNC 0 00070 #endif 00071 #ifndef O_DSYNC 00072 #define O_DSYNC 0 00073 #endif 00074 00075 00076 #if defined (__linux__) 00077 //#include <asm/fcntl.h> 00078 #if !defined (O_DIRECT) && (defined (__alpha__) || defined (__i386__)) 00079 #define O_DIRECT 040000 /* direct disk access */ 00080 #endif 00081 #endif 00082 00083 00084 #ifndef O_DIRECT 00085 #define O_DIRECT O_SYNC 00086 #endif 00087 00088 00089 #include <stxxl/bits/namespace.h> 00090 #include <stxxl/bits/io/iostats.h> 00091 #include <stxxl/bits/common/semaphore.h> 00092 #include <stxxl/bits/common/mutex.h> 00093 #include <stxxl/bits/common/switch.h> 00094 #include <stxxl/bits/common/state.h> 00095 #include <stxxl/bits/common/exceptions.h> 00096 #include <stxxl/bits/io/completion_handler.h> 00097 00098 00099 __STXXL_BEGIN_NAMESPACE 00100 00105 00106 #define BLOCK_ALIGN 4096 00107 00108 typedef void * (*thread_function_t)(void *); 00109 typedef stxxl::int64 DISKID; 00110 00111 class request; 00112 class request_ptr; 00113 00115 00116 struct default_completion_handler 00117 { 00119 void operator () (request *) { } 00120 }; 00121 00123 00126 class file : private noncopyable 00127 { 00128 protected: 00129 int id; 00130 00134 file(int _id) : id(_id) { } 00135 00136 public: 00138 00141 enum open_mode 00142 { 00143 RDONLY = 1, 00144 WRONLY = 2, 00145 RDWR = 4, 00146 CREAT = 8, 00147 DIRECT = 16, 00148 TRUNC = 32 00149 }; 00150 00157 virtual request_ptr aread(void * buffer, stxxl::int64 pos, size_t bytes, 00158 completion_handler on_cmpl) = 0; 00165 virtual request_ptr awrite(void * buffer, stxxl::int64 pos, size_t bytes, 00166 completion_handler on_cmpl) = 0; 00167 00170 virtual void set_size(stxxl::int64 newsize) = 0; 00173 virtual stxxl::int64 size() = 0; 00175 __STXXL_DEPRECATED(int get_disk_number()) 00176 { 00177 return id; 00178 } 00182 int get_id() 00183 { 00184 return id; 00185 } 00186 00188 virtual void lock() { } 00189 00191 virtual void delete_region(int64 offset, unsigned_type size) 00192 { 00193 UNUSED(offset); 00194 UNUSED(size); 00195 } 00196 00197 virtual ~file() { } 00198 }; 00199 00200 class mc; 00201 class disk_queue; 00202 class disk_queues; 00203 00205 00209 class request : private noncopyable 00210 { 00211 friend int wait_any(request_ptr req_array[], int count); 00212 template <class request_iterator_> 00213 friend 00214 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end); 00215 friend class file; 00216 friend class disk_queue; 00217 friend class disk_queues; 00218 friend class request_ptr; 00219 00220 protected: 00221 virtual bool add_waiter(onoff_switch * sw) = 0; 00222 virtual void delete_waiter(onoff_switch * sw) = 0; 00223 //virtual void enqueue () = 0; 00224 virtual void serve() = 0; 00225 //virtual unsigned size() const; 00226 00227 completion_handler on_complete; 00228 int ref_cnt; 00229 std::auto_ptr<stxxl::io_error> error; 00230 00231 mutex ref_cnt_mutex; 00232 00233 public: 00234 enum request_type { READ, WRITE }; 00235 00236 protected: 00237 file * file_; 00238 void * buffer; 00239 stxxl::int64 offset; 00240 size_t bytes; 00241 request_type type; 00242 00243 void completed() 00244 { 00245 on_complete(this); 00246 } 00247 00248 // returns number of references 00249 int nref() 00250 { 00251 scoped_mutex_lock Lock(ref_cnt_mutex); 00252 return ref_cnt; 00253 } 00254 00255 public: 00256 request(completion_handler on_compl, 00257 file * file__, 00258 void * buffer_, 00259 stxxl::int64 offset_, 00260 size_t bytes_, 00261 request_type type_) : 00262 on_complete(on_compl), ref_cnt(0), 00263 file_(file__), 00264 buffer(buffer_), 00265 offset(offset_), 00266 bytes(bytes_), 00267 type(type_) 00268 { 00269 STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": creation, cnt: " << ref_cnt); 00270 } 00272 virtual void wait() = 0; 00275 virtual bool poll() = 0; 00278 virtual const char * io_type() 00279 { 00280 return "none"; 00281 } 00282 virtual ~request() 00283 { 00284 STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": deletion, cnt: " << ref_cnt); 00285 } 00286 file * get_file() const { return file_; } 00287 void * get_buffer() const { return buffer; } 00288 stxxl::int64 get_offset() const { return offset; } 00289 size_t get_size() const { return bytes; } 00290 size_t size() const { return bytes; } 00291 request_type get_type() const { return type; } 00292 00293 virtual std::ostream & print(std::ostream & out) const 00294 { 00295 out << "File object address: " << (void *)get_file(); 00296 out << " Buffer address: " << (void *)get_buffer(); 00297 out << " File offset: " << get_offset(); 00298 out << " Transfer size: " << get_size() << " bytes"; 00299 out << " Type of transfer: " << ((get_type() == READ) ? "READ" : "WRITE"); 00300 return out; 00301 } 00302 00305 void error_occured(const char * msg) 00306 { 00307 error.reset(new stxxl::io_error(msg)); 00308 } 00309 00312 void error_occured(const std::string & msg) 00313 { 00314 error.reset(new stxxl::io_error(msg)); 00315 } 00316 00318 void check_errors() throw (stxxl::io_error) 00319 { 00320 if (error.get()) 00321 throw * (error.get()); 00322 } 00323 00324 private: 00325 void add_ref() 00326 { 00327 scoped_mutex_lock Lock(ref_cnt_mutex); 00328 ref_cnt++; 00329 STXXL_VERBOSE3("request add_ref() " << static_cast<void *>(this) << ": adding reference, cnt: " << ref_cnt); 00330 } 00331 00332 bool sub_ref() 00333 { 00334 int val; 00335 { 00336 scoped_mutex_lock Lock(ref_cnt_mutex); 00337 val = --ref_cnt; 00338 STXXL_VERBOSE3("request sub_ref() " << static_cast<void *>(this) << ": subtracting reference cnt: " << ref_cnt); 00339 } 00340 assert(val >= 0); 00341 return (val == 0); 00342 } 00343 }; 00344 00345 inline std::ostream & operator << (std::ostream & out, const request & req) 00346 { 00347 return req.print(out); 00348 } 00349 00351 00353 class request_ptr 00354 { 00355 request * ptr; 00356 void add_ref() 00357 { 00358 if (ptr) 00359 { 00360 ptr->add_ref(); 00361 } 00362 } 00363 void sub_ref() 00364 { 00365 if (ptr) 00366 { 00367 if (ptr->sub_ref()) 00368 { 00369 STXXL_VERBOSE3("the last copy " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this)); 00370 delete ptr; 00371 ptr = NULL; 00372 } 00373 else 00374 { 00375 STXXL_VERBOSE3("more copies " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this)); 00376 } 00377 } 00378 } 00379 00380 public: 00382 request_ptr(request * ptr_ = NULL) : ptr(ptr_) 00383 { 00384 STXXL_VERBOSE3("create constructor (request =" << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this)); 00385 add_ref(); 00386 } 00388 request_ptr(const request_ptr & p) : ptr(p.ptr) 00389 { 00390 STXXL_VERBOSE3("copy constructor (copying " << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this)); 00391 add_ref(); 00392 } 00394 ~request_ptr() 00395 { 00396 STXXL_VERBOSE3("Destructor of a request_ptr pointing to " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this)); 00397 sub_ref(); 00398 } 00401 request_ptr & operator = (const request_ptr & p) 00402 { 00403 // assert(p.ptr); 00404 return (*this = p.ptr); 00405 } 00408 request_ptr & operator = (request * p) 00409 { 00410 STXXL_VERBOSE3("assign operator begin (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this)); 00411 if (p != ptr) 00412 { 00413 sub_ref(); 00414 ptr = p; 00415 add_ref(); 00416 } 00417 STXXL_VERBOSE3("assign operator end (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this)); 00418 return *this; 00419 } 00422 request & operator * () const 00423 { 00424 assert(ptr); 00425 return *ptr; 00426 } 00429 request * operator -> () const 00430 { 00431 assert(ptr); 00432 return ptr; 00433 } 00438 request * get() const { return ptr; } 00439 00441 bool valid() const { return ptr; } 00442 00444 bool empty() const { return !ptr; } 00445 }; 00446 00448 00453 inline int wait_any(request_ptr req_array[], int count); 00457 inline void wait_all(request_ptr req_array[], int count); 00463 inline bool poll_any(request_ptr req_array[], int count, int & index); 00464 00465 00466 void wait_all(request_ptr req_array[], int count) 00467 { 00468 for (int i = 0; i < count; i++) 00469 { 00470 req_array[i]->wait(); 00471 } 00472 } 00473 00474 template <class request_iterator_> 00475 void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end) 00476 { 00477 while (reqs_begin != reqs_end) 00478 { 00479 (request_ptr(*reqs_begin))->wait(); 00480 ++reqs_begin; 00481 } 00482 } 00483 00484 bool poll_any(request_ptr req_array[], int count, int & index) 00485 { 00486 index = -1; 00487 for (int i = 0; i < count; i++) 00488 { 00489 if (req_array[i]->poll()) 00490 { 00491 index = i; 00492 return true; 00493 } 00494 } 00495 return false; 00496 } 00497 00498 template <class request_iterator_> 00499 request_iterator_ poll_any(request_iterator_ reqs_begin, request_iterator_ reqs_end) 00500 { 00501 while (reqs_begin != reqs_end) 00502 { 00503 if ((request_ptr(*reqs_begin))->poll()) 00504 return reqs_begin; 00505 00506 ++reqs_begin; 00507 } 00508 return reqs_end; 00509 } 00510 00511 00512 int wait_any(request_ptr req_array[], int count) 00513 { 00514 stats::scoped_wait_timer wait_timer; 00515 00516 onoff_switch sw; 00517 int i = 0, index = -1; 00518 00519 for ( ; i < count; i++) 00520 { 00521 if (req_array[i]->add_waiter(&sw)) 00522 { 00523 // already done 00524 index = i; 00525 00526 while (--i >= 0) 00527 req_array[i]->delete_waiter(&sw); 00528 00529 req_array[index]->check_errors(); 00530 00531 return index; 00532 } 00533 } 00534 00535 sw.wait_for_on(); 00536 00537 for (i = 0; i < count; i++) 00538 { 00539 req_array[i]->delete_waiter(&sw); 00540 if (index < 0 && req_array[i]->poll()) 00541 index = i; 00542 } 00543 00544 return index; 00545 } 00546 00547 template <class request_iterator_> 00548 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end) 00549 { 00550 stats::scoped_wait_timer wait_timer; 00551 00552 onoff_switch sw; 00553 00554 request_iterator_ cur = reqs_begin, result = reqs_end; 00555 00556 for ( ; cur != reqs_end; cur++) 00557 { 00558 if ((request_ptr(*cur))->add_waiter(&sw)) 00559 { 00560 // already done 00561 result = cur; 00562 00563 if (cur != reqs_begin) 00564 { 00565 while (--cur != reqs_begin) 00566 (request_ptr(*cur))->delete_waiter(&sw); 00567 00568 (request_ptr(*cur))->delete_waiter(&sw); 00569 } 00570 00571 (request_ptr(*result))->check_errors(); 00572 00573 return result; 00574 } 00575 } 00576 00577 sw.wait_for_on(); 00578 00579 for (cur = reqs_begin; cur != reqs_end; cur++) 00580 { 00581 (request_ptr(*cur))->delete_waiter(&sw); 00582 if (result == reqs_end && (request_ptr(*cur))->poll()) 00583 result = cur; 00584 } 00585 00586 return result; 00587 } 00588 00589 class disk_queue : private noncopyable 00590 { 00591 public: 00592 enum priority_op { READ, WRITE, NONE }; 00593 00594 private: 00595 mutex write_mutex; 00596 mutex read_mutex; 00597 std::queue<request_ptr> write_queue; 00598 std::queue<request_ptr> read_queue; 00599 00600 semaphore sem; 00601 00602 priority_op _priority_op; 00603 00604 #ifdef STXXL_BOOST_THREADS 00605 boost::thread thread; 00606 #else 00607 pthread_t thread; 00608 #endif 00609 00610 00611 static void * worker(void * arg); 00612 00613 public: 00614 disk_queue(int n = 1); // max number of requests simultaneously submitted to disk 00615 00616 void set_priority_op(priority_op op) 00617 { 00618 _priority_op = op; 00619 } 00620 void add_readreq(request_ptr & req); 00621 void add_writereq(request_ptr & req); 00622 ~disk_queue(); 00623 }; 00624 00627 class disk_queues : public singleton<disk_queues> 00628 { 00629 friend class singleton<disk_queues>; 00630 00631 protected: 00632 std::map<DISKID, disk_queue *> queues; 00633 disk_queues() { } 00634 00635 public: 00636 void add_readreq(request_ptr & req, DISKID disk) 00637 { 00638 if (queues.find(disk) == queues.end()) 00639 { 00640 // create new disk queue 00641 queues[disk] = new disk_queue(); 00642 } 00643 queues[disk]->add_readreq(req); 00644 } 00645 void add_writereq(request_ptr & req, DISKID disk) 00646 { 00647 if (queues.find(disk) == queues.end()) 00648 { 00649 // create new disk queue 00650 queues[disk] = new disk_queue(); 00651 } 00652 queues[disk]->add_writereq(req); 00653 } 00654 ~disk_queues() 00655 { 00656 // deallocate all queues 00657 for (std::map<DISKID, disk_queue *>::iterator i = 00658 queues.begin(); i != queues.end(); i++) 00659 delete (*i).second; 00660 } 00666 void set_priority_op(disk_queue::priority_op op) 00667 { 00668 for (std::map<DISKID, disk_queue *>::iterator i = 00669 queues.begin(); i != queues.end(); i++) 00670 i->second->set_priority_op(op); 00671 } 00672 }; 00673 00675 00676 __STXXL_END_NAMESPACE 00677 00678 #endif // !STXXL_IOBASE_HEADER