GNU Radio 3.5.1 C++ API
circular_buffer.h
Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2006,2009,2010 Free Software Foundation, Inc.
00004  * 
00005  * This file is part of GNU Radio.
00006  *
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 3, or (at your option)
00010  * any later version.
00011  * 
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  * 
00017  * You should have received a copy of the GNU General Public License
00018  * along with GNU Radio; see the file COPYING.  If not, write to
00019  * the Free Software Foundation, Inc., 51 Franklin Street,
00020  * Boston, MA 02110-1301, USA.
00021  */
00022 
00023 #ifndef _CIRCULAR_BUFFER_H_
00024 #define _CIRCULAR_BUFFER_H_
00025 
00026 #include <gruel/thread.h>
00027 #include <iostream>
00028 #include <stdexcept>
00029 
00030 #ifndef DO_DEBUG
00031 #define DO_DEBUG 0
00032 #endif
00033 
00034 #if DO_DEBUG
00035 #define DEBUG(X) do{X} while(0);
00036 #else
00037 #define DEBUG(X) do{} while(0);
00038 #endif
00039 
00040 template <class T>
00041 class circular_buffer
00042 {
00043 private:
00044 // the buffer to use
00045   T* d_buffer;
00046 
00047 // the following are in Items (type T)
00048   size_t d_bufLen_I, d_readNdx_I, d_writeNdx_I;
00049   size_t d_n_avail_write_I, d_n_avail_read_I;
00050 
00051 // stuff to control access to class internals
00052   gruel::mutex* d_internal;
00053   gruel::condition_variable* d_readBlock;
00054   gruel::condition_variable* d_writeBlock;
00055 
00056 // booleans to decide how to control reading, writing, and aborting
00057   bool d_doWriteBlock, d_doFullRead, d_doAbort;
00058 
00059   void delete_mutex_cond () {
00060     if (d_internal) {
00061       delete d_internal;
00062       d_internal = NULL;
00063     }
00064     if (d_readBlock) {
00065       delete d_readBlock;
00066       d_readBlock = NULL;
00067     }
00068     if (d_writeBlock) {
00069       delete d_writeBlock;
00070       d_writeBlock = NULL;
00071     }
00072   };
00073 
00074 public:
00075   circular_buffer (size_t bufLen_I,
00076                    bool doWriteBlock = true, bool doFullRead = false) {
00077     if (bufLen_I == 0)
00078       throw std::runtime_error ("circular_buffer(): "
00079                                 "Number of items to buffer must be > 0.\n");
00080     d_bufLen_I = bufLen_I;
00081     d_buffer = (T*) new T[d_bufLen_I];
00082     d_doWriteBlock = doWriteBlock;
00083     d_doFullRead = doFullRead;
00084     d_internal = NULL;
00085     d_readBlock = d_writeBlock = NULL;
00086     reset ();
00087     DEBUG (std::cerr << "c_b(): buf len (items) = " << d_bufLen_
00088            << ", doWriteBlock = " << (d_doWriteBlock ? "true" : "false")
00089            << ", doFullRead = " << (d_doFullRead ? "true" : "false")
00090            << std::endl);
00091   };
00092 
00093   ~circular_buffer () {
00094     delete_mutex_cond ();
00095     delete [] d_buffer;
00096   };
00097 
00098   inline size_t n_avail_write_items () {
00099     gruel::scoped_lock l (*d_internal);
00100     size_t retVal = d_n_avail_write_I;
00101     return (retVal);
00102   };
00103 
00104   inline size_t n_avail_read_items () {
00105     gruel::scoped_lock l (*d_internal);
00106     size_t retVal = d_n_avail_read_I;
00107     return (retVal);
00108   };
00109 
00110   inline size_t buffer_length_items () {return (d_bufLen_I);};
00111   inline bool do_write_block () {return (d_doWriteBlock);};
00112   inline bool do_full_read () {return (d_doFullRead);};
00113 
00114   void reset () {
00115     d_doAbort = false;
00116     bzero (d_buffer, d_bufLen_I * sizeof (T));
00117     d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0;
00118     d_n_avail_write_I = d_bufLen_I;
00119     delete_mutex_cond ();
00120     // create a mutex to handle contention of shared resources;
00121     // any routine needed access to shared resources uses lock()
00122     // before doing anything, then unlock() when finished.
00123     d_internal = new gruel::mutex ();
00124     // link the internal mutex to the read and write conditions;
00125     // when wait() is called, the internal mutex will automatically
00126     // be unlock()'ed.  Upon return (from a notify_one() to the condition),
00127     // the internal mutex will be lock()'ed.
00128     d_readBlock = new gruel::condition_variable ();
00129     d_writeBlock = new gruel::condition_variable ();
00130   };
00131 
00132 /*
00133  * enqueue: add the given buffer of item-length to the queue,
00134  *     first-in-first-out (FIFO).
00135  *
00136  * inputs:
00137  *     buf: a pointer to the buffer holding the data
00138  *
00139  *     bufLen_I: the buffer length in items (of the instantiated type)
00140  *
00141  * returns:
00142  *    -1: on overflow (write is not blocking, and data is being
00143  *                     written faster than it is being read)
00144  *     0: if nothing to do (0 length buffer)
00145  *     1: if success
00146  *     2: in the process of aborting, do doing nothing
00147  *
00148  * will throw runtime errors if inputs are improper:
00149  *     buffer pointer is NULL
00150  *     buffer length is larger than the instantiated buffer length
00151  */
00152 
00153   int enqueue (T* buf, size_t bufLen_I) {
00154     DEBUG (std::cerr << "enqueue: buf = " << (void*) buf
00155            << ", bufLen = " << bufLen_I
00156            << ", #av_wr = " << d_n_avail_write_I
00157            << ", #av_rd = " << d_n_avail_read_I << std::endl);
00158     if (bufLen_I > d_bufLen_I) {
00159       std::cerr << "ERROR: cannot add buffer longer ("
00160                 << bufLen_I << ") than instantiated length ("
00161                 << d_bufLen_I << ")." << std::endl;
00162       throw std::runtime_error ("circular_buffer::enqueue()");
00163     }
00164 
00165     if (bufLen_I == 0)
00166       return (0);
00167     if (!buf)
00168       throw std::runtime_error ("circular_buffer::enqueue(): "
00169                                 "input buffer is NULL.\n");
00170     gruel::scoped_lock l (*d_internal);
00171     if (d_doAbort) {
00172       return (2);
00173     }
00174     // set the return value to 1: success; change if needed
00175     int retval = 1;
00176     if (bufLen_I > d_n_avail_write_I) {
00177       if (d_doWriteBlock) {
00178         while (bufLen_I > d_n_avail_write_I) {
00179           DEBUG (std::cerr << "enqueue: #len > #a, waiting." << std::endl);
00180           // wait; will automatically unlock() the internal mutex via
00181           // the scoped lock
00182           d_writeBlock->wait (l);
00183           // and auto re-lock() it here.
00184           if (d_doAbort) {
00185             DEBUG (std::cerr << "enqueue: #len > #a, aborting." << std::endl);
00186             return (2);
00187           }
00188           DEBUG (std::cerr << "enqueue: #len > #a, done waiting." << std::endl);
00189         }
00190       } else {
00191         d_n_avail_read_I = d_bufLen_I - bufLen_I;
00192         d_n_avail_write_I = bufLen_I;
00193         DEBUG (std::cerr << "circular_buffer::enqueue: overflow" << std::endl);
00194         retval = -1;
00195       }
00196     }
00197     size_t n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0;
00198     if (n_now_I > bufLen_I)
00199       n_now_I = bufLen_I;
00200     else if (n_now_I < bufLen_I)
00201       n_start_I = bufLen_I - n_now_I;
00202     bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T));
00203     if (n_start_I) {
00204       bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T));
00205       d_writeNdx_I = n_start_I;
00206     } else
00207       d_writeNdx_I += n_now_I;
00208     d_n_avail_read_I += bufLen_I;
00209     d_n_avail_write_I -= bufLen_I;
00210     d_readBlock->notify_one ();
00211     return (retval);
00212   };
00213 
00214 /*
00215  * dequeue: removes from the queue the number of items requested, or
00216  *     available, into the given buffer on a FIFO basis.
00217  *
00218  * inputs:
00219  *     buf: a pointer to the buffer into which to copy the data
00220  *
00221  *     bufLen_I: pointer to the number of items to remove in items
00222  *         (of the instantiated type)
00223  *
00224  * returns:
00225  *     0: if nothing to do (0 length buffer)
00226  *     1: if success
00227  *     2: in the process of aborting, do doing nothing
00228  *
00229  * will throw runtime errors if inputs are improper:
00230  *     buffer pointer is NULL
00231  *     buffer length pointer is NULL
00232  *     buffer length is larger than the instantiated buffer length
00233  */
00234 
00235   int dequeue (T* buf, size_t* bufLen_I) {
00236     DEBUG (std::cerr << "dequeue: buf = " << ((void*) buf)
00237            << ", *bufLen = " << (*bufLen_I)
00238            << ", #av_wr = " <<  d_n_avail_write_I
00239            << ", #av_rd = " << d_n_avail_read_I << std::endl);
00240     if (!bufLen_I)
00241       throw std::runtime_error ("circular_buffer::dequeue(): "
00242                                 "input bufLen pointer is NULL.\n");
00243     if (!buf)
00244       throw std::runtime_error ("circular_buffer::dequeue(): "
00245                                 "input buffer pointer is NULL.\n");
00246     size_t l_bufLen_I = *bufLen_I;
00247     if (l_bufLen_I == 0)
00248       return (0);
00249     if (l_bufLen_I > d_bufLen_I) {
00250       std::cerr << "ERROR: cannot remove buffer longer ("
00251                 << l_bufLen_I << ") than instantiated length ("
00252                 << d_bufLen_I << ")." << std::endl;
00253       throw std::runtime_error ("circular_buffer::dequeue()");
00254     }
00255 
00256     gruel::scoped_lock l (*d_internal);
00257     if (d_doAbort) {
00258       return (2);
00259     }
00260     if (d_doFullRead) {
00261       while (d_n_avail_read_I < l_bufLen_I) {
00262         DEBUG (std::cerr << "dequeue: #a < #len, waiting." << std::endl);
00263         // wait; will automatically unlock() the internal mutex via
00264         // the scoped lock
00265         d_readBlock->wait (l);
00266         // and re-lock() it here.
00267         if (d_doAbort) {
00268           DEBUG (std::cerr << "dequeue: #a < #len, aborting." << std::endl);
00269           return (2);
00270         }
00271         DEBUG (std::cerr << "dequeue: #a < #len, done waiting." << std::endl);
00272      }
00273     } else {
00274       while (d_n_avail_read_I == 0) {
00275         DEBUG (std::cerr << "dequeue: #a == 0, waiting." << std::endl);
00276         // wait; will automatically unlock() the internal mutex via
00277         // the scoped lock
00278         d_readBlock->wait (l);
00279         // and re-lock() it here.
00280         if (d_doAbort) {
00281           DEBUG (std::cerr << "dequeue: #a == 0, aborting." << std::endl);
00282           return (2);
00283         }
00284         DEBUG (std::cerr << "dequeue: #a == 0, done waiting." << std::endl);
00285       }
00286     }
00287     if (l_bufLen_I > d_n_avail_read_I)
00288       l_bufLen_I = d_n_avail_read_I;
00289     size_t n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0;
00290     if (n_now_I > l_bufLen_I)
00291       n_now_I = l_bufLen_I;
00292     else if (n_now_I < l_bufLen_I)
00293       n_start_I = l_bufLen_I - n_now_I;
00294     bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T));
00295     if (n_start_I) {
00296       bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T));
00297       d_readNdx_I = n_start_I;
00298     } else
00299       d_readNdx_I += n_now_I;
00300     *bufLen_I = l_bufLen_I;
00301     d_n_avail_read_I -= l_bufLen_I;
00302     d_n_avail_write_I += l_bufLen_I;
00303     d_writeBlock->notify_one ();
00304     return (1);
00305   };
00306 
00307   void abort () {
00308     gruel::scoped_lock l (*d_internal);
00309     d_doAbort = true;
00310     d_writeBlock->notify_one ();
00311     d_readBlock->notify_one ();
00312   };
00313 };
00314 
00315 #endif /* _CIRCULAR_BUFFER_H_ */