circular_buffer.h

Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2006 Free Software Foundation, Inc.
00004  * 
00005  * This file is part of GNU Radio.
00006  *
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 3, or (at your option)
00010  * any later version.
00011  * 
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with GNU Radio; see the file COPYING.  If not, write to
00019  * the Free Software Foundation, Inc., 51 Franklin Street,
00020  * Boston, MA 02110-1301, USA.
00021  */
00022 
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025 
00026 #include "mld_threads.h"
00027 #include <stdexcept>
00028 
00029 #define DO_DEBUG 0
00030 
00031 #if DO_DEBUG
00032 #define DEBUG(X) do{X} while(0);
00033 #else
00034 #define DEBUG(X) do{} while(0);
00035 #endif
00036 
00037 template <class T> class circular_buffer
00038 {
00039 private:
00040 // the buffer to use
00041   T* d_buffer;
00042 
00043 // the following are in Items (type T)
00044   UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00045   UInt32 d_n_avail_write_I, d_n_avail_read_I;
00046 
00047 // stuff to control access to class internals
00048   mld_mutex_ptr d_internal;
00049   mld_condition_ptr d_readBlock, d_writeBlock;
00050 
00051 // booleans to decide how to control reading, writing, and aborting
00052   bool d_doWriteBlock, d_doFullRead, d_doAbort;
00053 
00054   void delete_mutex_cond () {
00055     if (d_internal) {
00056       delete d_internal;
00057       d_internal = NULL;
00058     }
00059     if (d_readBlock) {
00060       delete d_readBlock;
00061       d_readBlock = NULL;
00062     }
00063     if (d_writeBlock) {
00064       delete d_writeBlock;
00065       d_writeBlock = NULL;
00066     }
00067   };
00068 
00069 public:
00070   circular_buffer (UInt32 bufLen_I,
00071                    bool doWriteBlock = true, bool doFullRead = false) {
00072     if (bufLen_I == 0)
00073       throw std::runtime_error ("circular_buffer(): "
00074                                 "Number of items to buffer must be > 0.\n");
00075     d_bufLen_I = bufLen_I;
00076     d_buffer = (T*) new T[d_bufLen_I];
00077     d_doWriteBlock = doWriteBlock;
00078     d_doFullRead = doFullRead;
00079     d_internal = NULL;
00080     d_readBlock = d_writeBlock = NULL;
00081     reset ();
00082     DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, "
00083                     "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I,
00084                     (d_doWriteBlock ? "true" : "false"),
00085                     (d_doFullRead ? "true" : "false")));
00086   };
00087 
00088   ~circular_buffer () {
00089     delete_mutex_cond ();
00090     delete [] d_buffer;
00091   };
00092 
00093   inline UInt32 n_avail_write_items () {
00094     d_internal->lock ();
00095     UInt32 retVal = d_n_avail_write_I;
00096     d_internal->unlock ();
00097     return (retVal);
00098   };
00099 
00100   inline UInt32 n_avail_read_items () {
00101     d_internal->lock ();
00102     UInt32 retVal = d_n_avail_read_I;
00103     d_internal->unlock ();
00104     return (retVal);
00105   };
00106 
00107   inline UInt32 buffer_length_items () {return (d_bufLen_I);};
00108   inline bool do_write_block () {return (d_doWriteBlock);};
00109   inline bool do_full_read () {return (d_doFullRead);};
00110 
00111   void reset () {
00112     d_doAbort = false;
00113     bzero (d_buffer, d_bufLen_I * sizeof (T));
00114     d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00115     d_n_avail_write_I = d_bufLen_I;
00116     delete_mutex_cond ();
00117     // create a mutex to handle contention of shared resources;
00118     // any routine needed access to shared resources uses lock()
00119     // before doing anything, then unlock() when finished.
00120     d_internal = new mld_mutex ();
00121     // link the internal mutex to the read and write conditions;
00122     // when wait() is called, the internal mutex will automatically
00123     // be unlock()'ed.  Upon return (from a signal() to the condition),
00124     // the internal mutex will be lock()'ed.
00125     d_readBlock = new mld_condition (d_internal);
00126     d_writeBlock = new mld_condition (d_internal);
00127   };
00128 
00129 /*
00130  * enqueue: add the given buffer of item-length to the queue,
00131  *     first-in-first-out (FIFO).
00132  *
00133  * inputs:
00134  *     buf: a pointer to the buffer holding the data
00135  *
00136  *     bufLen_I: the buffer length in items (of the instantiated type)
00137  *
00138  * returns:
00139  *    -1: on overflow (write is not blocking, and data is being
00140  *                     written faster than it is being read)
00141  *     0: if nothing to do (0 length buffer)
00142  *     1: if success
00143  *     2: in the process of aborting, do doing nothing
00144  *
00145  * will throw runtime errors if inputs are improper:
00146  *     buffer pointer is NULL
00147  *     buffer length is larger than the instantiated buffer length
00148  */
00149 
00150   int enqueue (T* buf, UInt32 bufLen_I) {
00151     DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, "
00152                     "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I,
00153                     d_n_avail_write_I, d_n_avail_read_I));
00154     if (bufLen_I > d_bufLen_I) {
00155       fprintf (stderr, "cannot add buffer longer (%ld"
00156                ") than instantiated length (%ld"
00157                ").\n", bufLen_I, d_bufLen_I);
00158       throw std::runtime_error ("circular_buffer::enqueue()");
00159     }
00160 
00161     if (bufLen_I == 0)
00162       return (0);
00163     if (!buf)
00164       throw std::runtime_error ("circular_buffer::enqueue(): "
00165                                 "input buffer is NULL.\n");
00166     d_internal->lock ();
00167     if (d_doAbort) {
00168       d_internal->unlock ();
00169       return (2);
00170     }
00171     // set the return value to 1: success; change if needed
00172     int retval = 1;
00173     if (bufLen_I > d_n_avail_write_I) {
00174       if (d_doWriteBlock) {
00175         while (bufLen_I > d_n_avail_write_I) {
00176           DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n"));
00177           // wait will automatically unlock() the internal mutex
00178           d_writeBlock->wait ();
00179           // and lock() it here.
00180           if (d_doAbort) {
00181             d_internal->unlock ();
00182             DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n"));
00183             return (2);
00184           }
00185           DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n"));
00186         }
00187       } else {
00188         d_n_avail_read_I = d_bufLen_I - bufLen_I;
00189         d_n_avail_write_I = bufLen_I;
00190         DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n"));
00191         retval = -1;
00192       }
00193     }
00194     UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00195     if (n_now_I > bufLen_I)
00196       n_now_I = bufLen_I;
00197     else if (n_now_I < bufLen_I)
00198       n_start_I = bufLen_I - n_now_I;
00199     bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00200     if (n_start_I) {
00201       bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00202       d_writeNdx_I = n_start_I;
00203     } else
00204       d_writeNdx_I += n_now_I;
00205     d_n_avail_read_I += bufLen_I;
00206     d_n_avail_write_I -= bufLen_I;
00207     d_readBlock->signal ();
00208     d_internal->unlock ();
00209     return (retval);
00210   };
00211 
00212 /*
00213  * dequeue: removes from the queue the number of items requested, or
00214  *     available, into the given buffer on a FIFO basis.
00215  *
00216  * inputs:
00217  *     buf: a pointer to the buffer into which to copy the data
00218  *
00219  *     bufLen_I: pointer to the number of items to remove in items
00220  *         (of the instantiated type)
00221  *
00222  * returns:
00223  *     0: if nothing to do (0 length buffer)
00224  *     1: if success
00225  *     2: in the process of aborting, do doing nothing
00226  *
00227  * will throw runtime errors if inputs are improper:
00228  *     buffer pointer is NULL
00229  *     buffer length pointer is NULL
00230  *     buffer length is larger than the instantiated buffer length
00231  */
00232 
00233   int dequeue (T* buf, UInt32* bufLen_I) {
00234     DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, "
00235                     "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I,
00236                     d_n_avail_write_I, d_n_avail_read_I));
00237     if (!bufLen_I)
00238       throw std::runtime_error ("circular_buffer::dequeue(): "
00239                                 "input bufLen pointer is NULL.\n");
00240     if (!buf)
00241       throw std::runtime_error ("circular_buffer::dequeue(): "
00242                                 "input buffer pointer is NULL.\n");
00243     UInt32 l_bufLen_I = *bufLen_I;
00244     if (l_bufLen_I == 0)
00245       return (0);
00246     if (l_bufLen_I > d_bufLen_I) {
00247       fprintf (stderr, "cannot remove buffer longer (%ld"
00248                ") than instantiated length (%ld"
00249                ").\n", l_bufLen_I, d_bufLen_I);
00250       throw std::runtime_error ("circular_buffer::dequeue()");
00251     }
00252 
00253     d_internal->lock ();
00254     if (d_doAbort) {
00255       d_internal->unlock ();
00256       return (2);
00257     }
00258     if (d_doFullRead) {
00259       while (d_n_avail_read_I < l_bufLen_I) {
00260         DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n"));
00261         // wait will automatically unlock() the internal mutex
00262         d_readBlock->wait ();
00263         // and lock() it here.
00264         if (d_doAbort) {
00265           d_internal->unlock ();
00266           DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n"));
00267           return (2);
00268         }
00269         DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n"));
00270      }
00271     } else {
00272       while (d_n_avail_read_I == 0) {
00273         DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n"));
00274         // wait will automatically unlock() the internal mutex
00275         d_readBlock->wait ();
00276         // and lock() it here.
00277         if (d_doAbort) {
00278           d_internal->unlock ();
00279           DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n"));
00280           return (2);
00281         }
00282         DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n"));
00283       }
00284     }
00285     if (l_bufLen_I > d_n_avail_read_I)
00286       l_bufLen_I = d_n_avail_read_I;
00287     UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00288     if (n_now_I > l_bufLen_I)
00289       n_now_I = l_bufLen_I;
00290     else if (n_now_I < l_bufLen_I)
00291       n_start_I = l_bufLen_I - n_now_I;
00292     bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00293     if (n_start_I) {
00294       bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00295       d_readNdx_I = n_start_I;
00296     } else
00297       d_readNdx_I += n_now_I;
00298     *bufLen_I = l_bufLen_I;
00299     d_n_avail_read_I -= l_bufLen_I;
00300     d_n_avail_write_I += l_bufLen_I;
00301     d_writeBlock->signal ();
00302     d_internal->unlock ();
00303     return (1);
00304   };
00305 
00306   void abort () {
00307     d_internal->lock ();
00308     d_doAbort = true;
00309     d_writeBlock->signal ();
00310     d_readBlock->signal ();
00311     d_internal->unlock ();
00312   };
00313 };
00314 
00315 #endif /* _CIRCULAR_BUFFER_H_ */

Generated on Thu Mar 5 09:01:06 2009 for GNU Radio 3.1.3 by  doxygen 1.5.8