00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025
00026 #include "mld_threads.h"
00027 #include <stdexcept>
00028
00029 #ifndef DO_DEBUG
00030 #define DO_DEBUG 0
00031 #endif
00032
00033 #if DO_DEBUG
00034 #define DEBUG(X) do{X} while(0);
00035 #else
00036 #define DEBUG(X) do{} while(0);
00037 #endif
00038
00039 template <class T> class circular_buffer
00040 {
00041 private:
00042
00043 T* d_buffer;
00044
00045
00046 UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00047 UInt32 d_n_avail_write_I, d_n_avail_read_I;
00048
00049
00050 mld_mutex_ptr d_internal;
00051 mld_condition_ptr d_readBlock, d_writeBlock;
00052
00053
00054 bool d_doWriteBlock, d_doFullRead, d_doAbort;
00055
00056 void delete_mutex_cond () {
00057 if (d_internal) {
00058 delete d_internal;
00059 d_internal = NULL;
00060 }
00061 if (d_readBlock) {
00062 delete d_readBlock;
00063 d_readBlock = NULL;
00064 }
00065 if (d_writeBlock) {
00066 delete d_writeBlock;
00067 d_writeBlock = NULL;
00068 }
00069 };
00070
00071 public:
00072 circular_buffer (UInt32 bufLen_I,
00073 bool doWriteBlock = true, bool doFullRead = false) {
00074 if (bufLen_I == 0)
00075 throw std::runtime_error ("circular_buffer(): "
00076 "Number of items to buffer must be > 0.\n");
00077 d_bufLen_I = bufLen_I;
00078 d_buffer = (T*) new T[d_bufLen_I];
00079 d_doWriteBlock = doWriteBlock;
00080 d_doFullRead = doFullRead;
00081 d_internal = NULL;
00082 d_readBlock = d_writeBlock = NULL;
00083 reset ();
00084 DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, "
00085 "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
00086 (d_doWriteBlock ? "true" : "false"),
00087 (d_doFullRead ? "true" : "false")););
00088 };
00089
00090 ~circular_buffer () {
00091 delete_mutex_cond ();
00092 delete [] d_buffer;
00093 };
00094
00095 inline UInt32 n_avail_write_items () {
00096 d_internal->lock ();
00097 UInt32 retVal = d_n_avail_write_I;
00098 d_internal->unlock ();
00099 return (retVal);
00100 };
00101
00102 inline UInt32 n_avail_read_items () {
00103 d_internal->lock ();
00104 UInt32 retVal = d_n_avail_read_I;
00105 d_internal->unlock ();
00106 return (retVal);
00107 };
00108
00109 inline UInt32 buffer_length_items () {return (d_bufLen_I);};
00110 inline bool do_write_block () {return (d_doWriteBlock);};
00111 inline bool do_full_read () {return (d_doFullRead);};
00112
00113 void reset () {
00114 d_doAbort = false;
00115 bzero (d_buffer, d_bufLen_I * sizeof (T));
00116 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00117 d_n_avail_write_I = d_bufLen_I;
00118 delete_mutex_cond ();
00119
00120
00121
00122 d_internal = new mld_mutex ();
00123
00124
00125
00126
00127 d_readBlock = new mld_condition (d_internal);
00128 d_writeBlock = new mld_condition (d_internal);
00129 };
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152 int enqueue (T* buf, UInt32 bufLen_I) {
00153 DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
00154 "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
00155 d_n_avail_write_I, d_n_avail_read_I););
00156 if (bufLen_I > d_bufLen_I) {
00157 fprintf (stderr, "cannot add buffer longer (%ld"
00158 ") than instantiated length (%ld"
00159 ").\n", bufLen_I, d_bufLen_I);
00160 throw std::runtime_error ("circular_buffer::enqueue()");
00161 }
00162
00163 if (bufLen_I == 0)
00164 return (0);
00165 if (!buf)
00166 throw std::runtime_error ("circular_buffer::enqueue(): "
00167 "input buffer is NULL.\n");
00168 d_internal->lock ();
00169 if (d_doAbort) {
00170 d_internal->unlock ();
00171 return (2);
00172 }
00173
00174 int retval = 1;
00175 if (bufLen_I > d_n_avail_write_I) {
00176 if (d_doWriteBlock) {
00177 while (bufLen_I > d_n_avail_write_I) {
00178 DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"););
00179
00180 d_writeBlock->wait ();
00181
00182 if (d_doAbort) {
00183 d_internal->unlock ();
00184 DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"););
00185 return (2);
00186 }
00187 DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"););
00188 }
00189 } else {
00190 d_n_avail_read_I = d_bufLen_I - bufLen_I;
00191 d_n_avail_write_I = bufLen_I;
00192 DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"););
00193 retval = -1;
00194 }
00195 }
00196 UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00197 if (n_now_I > bufLen_I)
00198 n_now_I = bufLen_I;
00199 else if (n_now_I < bufLen_I)
00200 n_start_I = bufLen_I - n_now_I;
00201 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00202 if (n_start_I) {
00203 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00204 d_writeNdx_I = n_start_I;
00205 } else
00206 d_writeNdx_I += n_now_I;
00207 d_n_avail_read_I += bufLen_I;
00208 d_n_avail_write_I -= bufLen_I;
00209 d_readBlock->signal ();
00210 d_internal->unlock ();
00211 return (retval);
00212 };
00213
00214
00215
00216
00217
00218
00219
00220
00221
00222
00223
00224
00225
00226
00227
00228
00229
00230
00231
00232
00233
00234
00235 int dequeue (T* buf, UInt32* bufLen_I) {
00236 DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
00237 "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
00238 d_n_avail_write_I, d_n_avail_read_I););
00239 if (!bufLen_I)
00240 throw std::runtime_error ("circular_buffer::dequeue(): "
00241 "input bufLen pointer is NULL.\n");
00242 if (!buf)
00243 throw std::runtime_error ("circular_buffer::dequeue(): "
00244 "input buffer pointer is NULL.\n");
00245 UInt32 l_bufLen_I = *bufLen_I;
00246 if (l_bufLen_I == 0)
00247 return (0);
00248 if (l_bufLen_I > d_bufLen_I) {
00249 fprintf (stderr, "cannot remove buffer longer (%ld"
00250 ") than instantiated length (%ld"
00251 ").\n", l_bufLen_I, d_bufLen_I);
00252 throw std::runtime_error ("circular_buffer::dequeue()");
00253 }
00254
00255 d_internal->lock ();
00256 if (d_doAbort) {
00257 d_internal->unlock ();
00258 return (2);
00259 }
00260 if (d_doFullRead) {
00261 while (d_n_avail_read_I < l_bufLen_I) {
00262 DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"););
00263
00264 d_readBlock->wait ();
00265
00266 if (d_doAbort) {
00267 d_internal->unlock ();
00268 DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"););
00269 return (2);
00270 }
00271 DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"););
00272 }
00273 } else {
00274 while (d_n_avail_read_I == 0) {
00275 DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"););
00276
00277 d_readBlock->wait ();
00278
00279 if (d_doAbort) {
00280 d_internal->unlock ();
00281 DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"););
00282 return (2);
00283 }
00284 DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"););
00285 }
00286 }
00287 if (l_bufLen_I > d_n_avail_read_I)
00288 l_bufLen_I = d_n_avail_read_I;
00289 UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00290 if (n_now_I > l_bufLen_I)
00291 n_now_I = l_bufLen_I;
00292 else if (n_now_I < l_bufLen_I)
00293 n_start_I = l_bufLen_I - n_now_I;
00294 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00295 if (n_start_I) {
00296 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00297 d_readNdx_I = n_start_I;
00298 } else
00299 d_readNdx_I += n_now_I;
00300 *bufLen_I = l_bufLen_I;
00301 d_n_avail_read_I -= l_bufLen_I;
00302 d_n_avail_write_I += l_bufLen_I;
00303 d_writeBlock->signal ();
00304 d_internal->unlock ();
00305 return (1);
00306 };
00307
00308 void abort () {
00309 d_internal->lock ();
00310 d_doAbort = true;
00311 d_writeBlock->signal ();
00312 d_readBlock->signal ();
00313 d_internal->unlock ();
00314 };
00315 };
00316
00317 #endif