GNU Radio 3.5.1 C++ API
|
00001 /* -*- c++ -*- */ 00002 /* 00003 * Copyright 2006,2009,2010 Free Software Foundation, Inc. 00004 * 00005 * This file is part of GNU Radio. 00006 * 00007 * GNU Radio is free software; you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation; either version 3, or (at your option) 00010 * any later version. 00011 * 00012 * GNU Radio is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with GNU Radio; see the file COPYING. If not, write to 00019 * the Free Software Foundation, Inc., 51 Franklin Street, 00020 * Boston, MA 02110-1301, USA. 00021 */ 00022 00023 #ifndef _CIRCULAR_BUFFER_H_ 00024 #define _CIRCULAR_BUFFER_H_ 00025 00026 #include <gruel/thread.h> 00027 #include <iostream> 00028 #include <stdexcept> 00029 00030 #ifndef DO_DEBUG 00031 #define DO_DEBUG 0 00032 #endif 00033 00034 #if DO_DEBUG 00035 #define DEBUG(X) do{X} while(0); 00036 #else 00037 #define DEBUG(X) do{} while(0); 00038 #endif 00039 00040 template <class T> 00041 class circular_buffer 00042 { 00043 private: 00044 // the buffer to use 00045 T* d_buffer; 00046 00047 // the following are in Items (type T) 00048 size_t d_bufLen_I, d_readNdx_I, d_writeNdx_I; 00049 size_t d_n_avail_write_I, d_n_avail_read_I; 00050 00051 // stuff to control access to class internals 00052 gruel::mutex* d_internal; 00053 gruel::condition_variable* d_readBlock; 00054 gruel::condition_variable* d_writeBlock; 00055 00056 // booleans to decide how to control reading, writing, and aborting 00057 bool d_doWriteBlock, d_doFullRead, d_doAbort; 00058 00059 void delete_mutex_cond () { 00060 if (d_internal) { 00061 delete d_internal; 00062 d_internal = NULL; 00063 } 00064 if (d_readBlock) { 00065 delete d_readBlock; 00066 d_readBlock = NULL; 00067 } 00068 if (d_writeBlock) { 00069 delete d_writeBlock; 00070 d_writeBlock = NULL; 00071 } 00072 }; 00073 00074 public: 00075 circular_buffer (size_t bufLen_I, 00076 bool doWriteBlock = true, bool doFullRead = false) { 00077 if (bufLen_I == 0) 00078 throw std::runtime_error ("circular_buffer(): " 00079 "Number of items to buffer must be > 0.\n"); 00080 d_bufLen_I = bufLen_I; 00081 d_buffer = (T*) new T[d_bufLen_I]; 00082 d_doWriteBlock = doWriteBlock; 00083 d_doFullRead = doFullRead; 00084 d_internal = NULL; 00085 d_readBlock = d_writeBlock = NULL; 00086 reset (); 00087 DEBUG (std::cerr << "c_b(): buf len (items) = " << d_bufLen_ 00088 << ", doWriteBlock = " << (d_doWriteBlock ? "true" : "false") 00089 << ", doFullRead = " << (d_doFullRead ? "true" : "false") 00090 << std::endl); 00091 }; 00092 00093 ~circular_buffer () { 00094 delete_mutex_cond (); 00095 delete [] d_buffer; 00096 }; 00097 00098 inline size_t n_avail_write_items () { 00099 gruel::scoped_lock l (*d_internal); 00100 size_t retVal = d_n_avail_write_I; 00101 return (retVal); 00102 }; 00103 00104 inline size_t n_avail_read_items () { 00105 gruel::scoped_lock l (*d_internal); 00106 size_t retVal = d_n_avail_read_I; 00107 return (retVal); 00108 }; 00109 00110 inline size_t buffer_length_items () {return (d_bufLen_I);}; 00111 inline bool do_write_block () {return (d_doWriteBlock);}; 00112 inline bool do_full_read () {return (d_doFullRead);}; 00113 00114 void reset () { 00115 d_doAbort = false; 00116 bzero (d_buffer, d_bufLen_I * sizeof (T)); 00117 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0; 00118 d_n_avail_write_I = d_bufLen_I; 00119 delete_mutex_cond (); 00120 // create a mutex to handle contention of shared resources; 00121 // any routine needed access to shared resources uses lock() 00122 // before doing anything, then unlock() when finished. 00123 d_internal = new gruel::mutex (); 00124 // link the internal mutex to the read and write conditions; 00125 // when wait() is called, the internal mutex will automatically 00126 // be unlock()'ed. Upon return (from a notify_one() to the condition), 00127 // the internal mutex will be lock()'ed. 00128 d_readBlock = new gruel::condition_variable (); 00129 d_writeBlock = new gruel::condition_variable (); 00130 }; 00131 00132 /* 00133 * enqueue: add the given buffer of item-length to the queue, 00134 * first-in-first-out (FIFO). 00135 * 00136 * inputs: 00137 * buf: a pointer to the buffer holding the data 00138 * 00139 * bufLen_I: the buffer length in items (of the instantiated type) 00140 * 00141 * returns: 00142 * -1: on overflow (write is not blocking, and data is being 00143 * written faster than it is being read) 00144 * 0: if nothing to do (0 length buffer) 00145 * 1: if success 00146 * 2: in the process of aborting, do doing nothing 00147 * 00148 * will throw runtime errors if inputs are improper: 00149 * buffer pointer is NULL 00150 * buffer length is larger than the instantiated buffer length 00151 */ 00152 00153 int enqueue (T* buf, size_t bufLen_I) { 00154 DEBUG (std::cerr << "enqueue: buf = " << (void*) buf 00155 << ", bufLen = " << bufLen_I 00156 << ", #av_wr = " << d_n_avail_write_I 00157 << ", #av_rd = " << d_n_avail_read_I << std::endl); 00158 if (bufLen_I > d_bufLen_I) { 00159 std::cerr << "ERROR: cannot add buffer longer (" 00160 << bufLen_I << ") than instantiated length (" 00161 << d_bufLen_I << ")." << std::endl; 00162 throw std::runtime_error ("circular_buffer::enqueue()"); 00163 } 00164 00165 if (bufLen_I == 0) 00166 return (0); 00167 if (!buf) 00168 throw std::runtime_error ("circular_buffer::enqueue(): " 00169 "input buffer is NULL.\n"); 00170 gruel::scoped_lock l (*d_internal); 00171 if (d_doAbort) { 00172 return (2); 00173 } 00174 // set the return value to 1: success; change if needed 00175 int retval = 1; 00176 if (bufLen_I > d_n_avail_write_I) { 00177 if (d_doWriteBlock) { 00178 while (bufLen_I > d_n_avail_write_I) { 00179 DEBUG (std::cerr << "enqueue: #len > #a, waiting." << std::endl); 00180 // wait; will automatically unlock() the internal mutex via 00181 // the scoped lock 00182 d_writeBlock->wait (l); 00183 // and auto re-lock() it here. 00184 if (d_doAbort) { 00185 DEBUG (std::cerr << "enqueue: #len > #a, aborting." << std::endl); 00186 return (2); 00187 } 00188 DEBUG (std::cerr << "enqueue: #len > #a, done waiting." << std::endl); 00189 } 00190 } else { 00191 d_n_avail_read_I = d_bufLen_I - bufLen_I; 00192 d_n_avail_write_I = bufLen_I; 00193 DEBUG (std::cerr << "circular_buffer::enqueue: overflow" << std::endl); 00194 retval = -1; 00195 } 00196 } 00197 size_t n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0; 00198 if (n_now_I > bufLen_I) 00199 n_now_I = bufLen_I; 00200 else if (n_now_I < bufLen_I) 00201 n_start_I = bufLen_I - n_now_I; 00202 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T)); 00203 if (n_start_I) { 00204 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T)); 00205 d_writeNdx_I = n_start_I; 00206 } else 00207 d_writeNdx_I += n_now_I; 00208 d_n_avail_read_I += bufLen_I; 00209 d_n_avail_write_I -= bufLen_I; 00210 d_readBlock->notify_one (); 00211 return (retval); 00212 }; 00213 00214 /* 00215 * dequeue: removes from the queue the number of items requested, or 00216 * available, into the given buffer on a FIFO basis. 00217 * 00218 * inputs: 00219 * buf: a pointer to the buffer into which to copy the data 00220 * 00221 * bufLen_I: pointer to the number of items to remove in items 00222 * (of the instantiated type) 00223 * 00224 * returns: 00225 * 0: if nothing to do (0 length buffer) 00226 * 1: if success 00227 * 2: in the process of aborting, do doing nothing 00228 * 00229 * will throw runtime errors if inputs are improper: 00230 * buffer pointer is NULL 00231 * buffer length pointer is NULL 00232 * buffer length is larger than the instantiated buffer length 00233 */ 00234 00235 int dequeue (T* buf, size_t* bufLen_I) { 00236 DEBUG (std::cerr << "dequeue: buf = " << ((void*) buf) 00237 << ", *bufLen = " << (*bufLen_I) 00238 << ", #av_wr = " << d_n_avail_write_I 00239 << ", #av_rd = " << d_n_avail_read_I << std::endl); 00240 if (!bufLen_I) 00241 throw std::runtime_error ("circular_buffer::dequeue(): " 00242 "input bufLen pointer is NULL.\n"); 00243 if (!buf) 00244 throw std::runtime_error ("circular_buffer::dequeue(): " 00245 "input buffer pointer is NULL.\n"); 00246 size_t l_bufLen_I = *bufLen_I; 00247 if (l_bufLen_I == 0) 00248 return (0); 00249 if (l_bufLen_I > d_bufLen_I) { 00250 std::cerr << "ERROR: cannot remove buffer longer (" 00251 << l_bufLen_I << ") than instantiated length (" 00252 << d_bufLen_I << ")." << std::endl; 00253 throw std::runtime_error ("circular_buffer::dequeue()"); 00254 } 00255 00256 gruel::scoped_lock l (*d_internal); 00257 if (d_doAbort) { 00258 return (2); 00259 } 00260 if (d_doFullRead) { 00261 while (d_n_avail_read_I < l_bufLen_I) { 00262 DEBUG (std::cerr << "dequeue: #a < #len, waiting." << std::endl); 00263 // wait; will automatically unlock() the internal mutex via 00264 // the scoped lock 00265 d_readBlock->wait (l); 00266 // and re-lock() it here. 00267 if (d_doAbort) { 00268 DEBUG (std::cerr << "dequeue: #a < #len, aborting." << std::endl); 00269 return (2); 00270 } 00271 DEBUG (std::cerr << "dequeue: #a < #len, done waiting." << std::endl); 00272 } 00273 } else { 00274 while (d_n_avail_read_I == 0) { 00275 DEBUG (std::cerr << "dequeue: #a == 0, waiting." << std::endl); 00276 // wait; will automatically unlock() the internal mutex via 00277 // the scoped lock 00278 d_readBlock->wait (l); 00279 // and re-lock() it here. 00280 if (d_doAbort) { 00281 DEBUG (std::cerr << "dequeue: #a == 0, aborting." << std::endl); 00282 return (2); 00283 } 00284 DEBUG (std::cerr << "dequeue: #a == 0, done waiting." << std::endl); 00285 } 00286 } 00287 if (l_bufLen_I > d_n_avail_read_I) 00288 l_bufLen_I = d_n_avail_read_I; 00289 size_t n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0; 00290 if (n_now_I > l_bufLen_I) 00291 n_now_I = l_bufLen_I; 00292 else if (n_now_I < l_bufLen_I) 00293 n_start_I = l_bufLen_I - n_now_I; 00294 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T)); 00295 if (n_start_I) { 00296 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T)); 00297 d_readNdx_I = n_start_I; 00298 } else 00299 d_readNdx_I += n_now_I; 00300 *bufLen_I = l_bufLen_I; 00301 d_n_avail_read_I -= l_bufLen_I; 00302 d_n_avail_write_I += l_bufLen_I; 00303 d_writeBlock->notify_one (); 00304 return (1); 00305 }; 00306 00307 void abort () { 00308 gruel::scoped_lock l (*d_internal); 00309 d_doAbort = true; 00310 d_writeBlock->notify_one (); 00311 d_readBlock->notify_one (); 00312 }; 00313 }; 00314 00315 #endif /* _CIRCULAR_BUFFER_H_ */