00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025
00026 #include "mld_threads.h"
00027 #include <stdexcept>
00028
00029 #define DO_DEBUG 0
00030
00031 #if DO_DEBUG
00032 #define DEBUG(X) do{X} while(0);
00033 #else
00034 #define DEBUG(X) do{} while(0);
00035 #endif
00036
00037 template <class T> class circular_buffer
00038 {
00039 private:
00040
00041 T* d_buffer;
00042
00043
00044 UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00045 UInt32 d_n_avail_write_I, d_n_avail_read_I;
00046
00047
00048 mld_mutex_ptr d_internal;
00049 mld_condition_ptr d_readBlock, d_writeBlock;
00050
00051
00052 bool d_doWriteBlock, d_doFullRead, d_doAbort;
00053
00054 void delete_mutex_cond () {
00055 if (d_internal) {
00056 delete d_internal;
00057 d_internal = NULL;
00058 }
00059 if (d_readBlock) {
00060 delete d_readBlock;
00061 d_readBlock = NULL;
00062 }
00063 if (d_writeBlock) {
00064 delete d_writeBlock;
00065 d_writeBlock = NULL;
00066 }
00067 };
00068
00069 public:
00070 circular_buffer (UInt32 bufLen_I,
00071 bool doWriteBlock = true, bool doFullRead = false) {
00072 if (bufLen_I == 0)
00073 throw std::runtime_error ("circular_buffer(): "
00074 "Number of items to buffer must be > 0.\n");
00075 d_bufLen_I = bufLen_I;
00076 d_buffer = (T*) new T[d_bufLen_I];
00077 d_doWriteBlock = doWriteBlock;
00078 d_doFullRead = doFullRead;
00079 d_internal = NULL;
00080 d_readBlock = d_writeBlock = NULL;
00081 reset ();
00082 DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, "
00083 "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
00084 (d_doWriteBlock ? "true" : "false"),
00085 (d_doFullRead ? "true" : "false")));
00086 };
00087
00088 ~circular_buffer () {
00089 delete_mutex_cond ();
00090 delete [] d_buffer;
00091 };
00092
00093 inline UInt32 n_avail_write_items () {
00094 d_internal->lock ();
00095 UInt32 retVal = d_n_avail_write_I;
00096 d_internal->unlock ();
00097 return (retVal);
00098 };
00099
00100 inline UInt32 n_avail_read_items () {
00101 d_internal->lock ();
00102 UInt32 retVal = d_n_avail_read_I;
00103 d_internal->unlock ();
00104 return (retVal);
00105 };
00106
00107 inline UInt32 buffer_length_items () {return (d_bufLen_I);};
00108 inline bool do_write_block () {return (d_doWriteBlock);};
00109 inline bool do_full_read () {return (d_doFullRead);};
00110
00111 void reset () {
00112 d_doAbort = false;
00113 bzero (d_buffer, d_bufLen_I * sizeof (T));
00114 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00115 d_n_avail_write_I = d_bufLen_I;
00116 delete_mutex_cond ();
00117
00118
00119
00120 d_internal = new mld_mutex ();
00121
00122
00123
00124
00125 d_readBlock = new mld_condition (d_internal);
00126 d_writeBlock = new mld_condition (d_internal);
00127 };
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150 int enqueue (T* buf, UInt32 bufLen_I) {
00151 DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
00152 "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
00153 d_n_avail_write_I, d_n_avail_read_I));
00154 if (bufLen_I > d_bufLen_I) {
00155 fprintf (stderr, "cannot add buffer longer (%ld"
00156 ") than instantiated length (%ld"
00157 ").\n", bufLen_I, d_bufLen_I);
00158 throw std::runtime_error ("circular_buffer::enqueue()");
00159 }
00160
00161 if (bufLen_I == 0)
00162 return (0);
00163 if (!buf)
00164 throw std::runtime_error ("circular_buffer::enqueue(): "
00165 "input buffer is NULL.\n");
00166 d_internal->lock ();
00167 if (d_doAbort) {
00168 d_internal->unlock ();
00169 return (2);
00170 }
00171
00172 int retval = 1;
00173 if (bufLen_I > d_n_avail_write_I) {
00174 if (d_doWriteBlock) {
00175 while (bufLen_I > d_n_avail_write_I) {
00176 DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"));
00177
00178 d_writeBlock->wait ();
00179
00180 if (d_doAbort) {
00181 d_internal->unlock ();
00182 DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"));
00183 return (2);
00184 }
00185 DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"));
00186 }
00187 } else {
00188 d_n_avail_read_I = d_bufLen_I - bufLen_I;
00189 d_n_avail_write_I = bufLen_I;
00190 DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"));
00191 retval = -1;
00192 }
00193 }
00194 UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00195 if (n_now_I > bufLen_I)
00196 n_now_I = bufLen_I;
00197 else if (n_now_I < bufLen_I)
00198 n_start_I = bufLen_I - n_now_I;
00199 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00200 if (n_start_I) {
00201 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00202 d_writeNdx_I = n_start_I;
00203 } else
00204 d_writeNdx_I += n_now_I;
00205 d_n_avail_read_I += bufLen_I;
00206 d_n_avail_write_I -= bufLen_I;
00207 d_readBlock->signal ();
00208 d_internal->unlock ();
00209 return (retval);
00210 };
00211
00212
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233 int dequeue (T* buf, UInt32* bufLen_I) {
00234 DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
00235 "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
00236 d_n_avail_write_I, d_n_avail_read_I));
00237 if (!bufLen_I)
00238 throw std::runtime_error ("circular_buffer::dequeue(): "
00239 "input bufLen pointer is NULL.\n");
00240 if (!buf)
00241 throw std::runtime_error ("circular_buffer::dequeue(): "
00242 "input buffer pointer is NULL.\n");
00243 UInt32 l_bufLen_I = *bufLen_I;
00244 if (l_bufLen_I == 0)
00245 return (0);
00246 if (l_bufLen_I > d_bufLen_I) {
00247 fprintf (stderr, "cannot remove buffer longer (%ld"
00248 ") than instantiated length (%ld"
00249 ").\n", l_bufLen_I, d_bufLen_I);
00250 throw std::runtime_error ("circular_buffer::dequeue()");
00251 }
00252
00253 d_internal->lock ();
00254 if (d_doAbort) {
00255 d_internal->unlock ();
00256 return (2);
00257 }
00258 if (d_doFullRead) {
00259 while (d_n_avail_read_I < l_bufLen_I) {
00260 DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"));
00261
00262 d_readBlock->wait ();
00263
00264 if (d_doAbort) {
00265 d_internal->unlock ();
00266 DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"));
00267 return (2);
00268 }
00269 DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"));
00270 }
00271 } else {
00272 while (d_n_avail_read_I == 0) {
00273 DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"));
00274
00275 d_readBlock->wait ();
00276
00277 if (d_doAbort) {
00278 d_internal->unlock ();
00279 DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"));
00280 return (2);
00281 }
00282 DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"));
00283 }
00284 }
00285 if (l_bufLen_I > d_n_avail_read_I)
00286 l_bufLen_I = d_n_avail_read_I;
00287 UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00288 if (n_now_I > l_bufLen_I)
00289 n_now_I = l_bufLen_I;
00290 else if (n_now_I < l_bufLen_I)
00291 n_start_I = l_bufLen_I - n_now_I;
00292 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00293 if (n_start_I) {
00294 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00295 d_readNdx_I = n_start_I;
00296 } else
00297 d_readNdx_I += n_now_I;
00298 *bufLen_I = l_bufLen_I;
00299 d_n_avail_read_I -= l_bufLen_I;
00300 d_n_avail_write_I += l_bufLen_I;
00301 d_writeBlock->signal ();
00302 d_internal->unlock ();
00303 return (1);
00304 };
00305
00306 void abort () {
00307 d_internal->lock ();
00308 d_doAbort = true;
00309 d_writeBlock->signal ();
00310 d_readBlock->signal ();
00311 d_internal->unlock ();
00312 };
00313 };
00314
00315 #endif