GNU Radio 3.7.1 C++ API
|
00001 /* -*- c++ -*- */ 00002 /* 00003 * Copyright 2006,2008,2009,2011,2013 Free Software Foundation, Inc. 00004 * 00005 * This file is part of GNU Radio 00006 * 00007 * GNU Radio is free software; you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation; either version 3, or (at your option) 00010 * any later version. 00011 * 00012 * GNU Radio is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with GNU Radio; see the file COPYING. If not, write to 00019 * the Free Software Foundation, Inc., 51 Franklin Street, 00020 * Boston, MA 02110-1301, USA. 00021 */ 00022 00023 #ifndef INCLUDED_GR_BASIC_BLOCK_H 00024 #define INCLUDED_GR_BASIC_BLOCK_H 00025 00026 #include <gnuradio/api.h> 00027 #include <gnuradio/sptr_magic.h> 00028 #include <gnuradio/msg_accepter.h> 00029 #include <gnuradio/runtime_types.h> 00030 #include <gnuradio/io_signature.h> 00031 #include <gnuradio/thread/thread.h> 00032 #include <boost/enable_shared_from_this.hpp> 00033 #include <boost/function.hpp> 00034 #include <boost/foreach.hpp> 00035 #include <boost/thread/condition_variable.hpp> 00036 #include <iostream> 00037 #include <string> 00038 #include <deque> 00039 #include <map> 00040 00041 #ifdef GR_CTRLPORT 00042 #include <gnuradio/rpcregisterhelpers.h> 00043 #endif 00044 00045 namespace gr { 00046 00047 /*! 00048 * \brief The abstract base class for all signal processing blocks. 00049 * \ingroup internal 00050 * 00051 * Basic blocks are the bare abstraction of an entity that has a 00052 * name, a set of inputs and outputs, and a message queue. These 00053 * are never instantiated directly; rather, this is the abstract 00054 * parent class of both gr_hier_block, which is a recursive 00055 * container, and block, which implements actual signal 00056 * processing functions. 00057 */ 00058 class GR_RUNTIME_API basic_block : public msg_accepter, 00059 public boost::enable_shared_from_this<basic_block> 00060 { 00061 typedef boost::function<void(pmt::pmt_t)> msg_handler_t; 00062 00063 private: 00064 //msg_handler_t d_msg_handler; 00065 typedef std::map<pmt::pmt_t , msg_handler_t, pmt::comperator> d_msg_handlers_t; 00066 d_msg_handlers_t d_msg_handlers; 00067 00068 typedef std::deque<pmt::pmt_t> msg_queue_t; 00069 typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comperator> msg_queue_map_t; 00070 typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comperator>::iterator msg_queue_map_itr; 00071 std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::comperator> msg_queue_ready; 00072 00073 gr::thread::mutex mutex; //< protects all vars 00074 00075 protected: 00076 friend class flowgraph; 00077 friend class flat_flowgraph; // TODO: will be redundant 00078 friend class tpb_thread_body; 00079 00080 enum vcolor { WHITE, GREY, BLACK }; 00081 00082 std::string d_name; 00083 gr::io_signature::sptr d_input_signature; 00084 gr::io_signature::sptr d_output_signature; 00085 long d_unique_id; 00086 long d_symbolic_id; 00087 std::string d_symbol_name; 00088 std::string d_symbol_alias; 00089 vcolor d_color; 00090 bool d_rpc_set; 00091 00092 msg_queue_map_t msg_queue; 00093 std::vector<boost::any> d_rpc_vars; // container for all RPC variables 00094 00095 basic_block(void) {} // allows pure virtual interface sub-classes 00096 00097 //! Protected constructor prevents instantiation by non-derived classes 00098 basic_block(const std::string &name, 00099 gr::io_signature::sptr input_signature, 00100 gr::io_signature::sptr output_signature); 00101 00102 //! may only be called during constructor 00103 void set_input_signature(gr::io_signature::sptr iosig) { 00104 d_input_signature = iosig; 00105 } 00106 00107 //! may only be called during constructor 00108 void set_output_signature(gr::io_signature::sptr iosig) { 00109 d_output_signature = iosig; 00110 } 00111 00112 /*! 00113 * \brief Allow the flowgraph to set for sorting and partitioning 00114 */ 00115 void set_color(vcolor color) { d_color = color; } 00116 vcolor color() const { return d_color; } 00117 00118 /*! 00119 * \brief Tests if there is a handler attached to port \p which_port 00120 */ 00121 virtual bool has_msg_handler(pmt::pmt_t which_port) { 00122 return (d_msg_handlers.find(which_port) != d_msg_handlers.end()); 00123 } 00124 00125 /* 00126 * This function is called by the runtime system to dispatch messages. 00127 * 00128 * The thread-safety guarantees mentioned in set_msg_handler are 00129 * implemented by the callers of this method. 00130 */ 00131 virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) 00132 { 00133 // AA Update this 00134 if(has_msg_handler(which_port)) { // Is there a handler? 00135 d_msg_handlers[which_port](msg); // Yes, invoke it. 00136 } 00137 } 00138 00139 // Message passing interface 00140 pmt::pmt_t message_subscribers; 00141 00142 public: 00143 virtual ~basic_block(); 00144 long unique_id() const { return d_unique_id; } 00145 long symbolic_id() const { return d_symbolic_id; } 00146 std::string name() const { return d_name; } 00147 std::string symbol_name() const { return d_symbol_name; } 00148 gr::io_signature::sptr input_signature() const { return d_input_signature; } 00149 gr::io_signature::sptr output_signature() const { return d_output_signature; } 00150 basic_block_sptr to_basic_block(); // Needed for Python type coercion 00151 bool alias_set() { return !d_symbol_alias.empty(); } 00152 std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); } 00153 pmt::pmt_t alias_pmt(){ return pmt::intern(alias()); } 00154 void set_block_alias(std::string name); 00155 00156 // ** Message passing interface ** 00157 void message_port_register_in(pmt::pmt_t port_id); 00158 void message_port_register_out(pmt::pmt_t port_id); 00159 void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg); 00160 void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); 00161 void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); 00162 00163 virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier\n"; return false; } 00164 virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier_in\n"; return false; } 00165 virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier_out\n"; return false; } 00166 00167 /*! 00168 * \brief Get input message port names. 00169 * 00170 * Returns the available input message ports for a block. The 00171 * return object is a PMT vector that is filled with PMT symbols. 00172 */ 00173 pmt::pmt_t message_ports_in(); 00174 00175 /*! 00176 * \brief Get output message port names. 00177 * 00178 * Returns the available output message ports for a block. The 00179 * return object is a PMT vector that is filled with PMT symbols. 00180 */ 00181 pmt::pmt_t message_ports_out(); 00182 00183 /*! 00184 * Accept msg, place in queue, arrange for thread to be awakened if it's not already. 00185 */ 00186 void _post(pmt::pmt_t which_port, pmt::pmt_t msg); 00187 00188 //! is the queue empty? 00189 bool empty_p(pmt::pmt_t which_port) { 00190 if(msg_queue.find(which_port) == msg_queue.end()) 00191 throw std::runtime_error("port does not exist!"); 00192 return msg_queue[which_port].empty(); 00193 } 00194 bool empty_p() { 00195 bool rv = true; 00196 BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) { 00197 rv &= msg_queue[i.first].empty(); 00198 } 00199 return rv; 00200 } 00201 00202 //! are all msg ports with handlers empty? 00203 bool empty_handled_p(pmt::pmt_t which_port){ 00204 return (empty_p(which_port) || !has_msg_handler(which_port)); 00205 } 00206 bool empty_handled_p() { 00207 bool rv = true; 00208 BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) { 00209 rv &= empty_handled_p(i.first); 00210 } 00211 return rv; 00212 } 00213 00214 //! How many messages in the queue? 00215 size_t nmsgs(pmt::pmt_t which_port) { 00216 if(msg_queue.find(which_port) == msg_queue.end()) 00217 throw std::runtime_error("port does not exist!"); 00218 return msg_queue[which_port].size(); 00219 } 00220 00221 //| Acquires and release the mutex 00222 void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg); 00223 /*! 00224 * \returns returns pmt at head of queue or pmt::pmt_t() if empty. 00225 */ 00226 pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); 00227 00228 /*! 00229 * \returns returns pmt at head of queue or pmt::pmt_t() if empty. 00230 */ 00231 pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port); 00232 00233 msg_queue_t::iterator get_iterator(pmt::pmt_t which_port) { 00234 return msg_queue[which_port].begin(); 00235 } 00236 00237 void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it) { 00238 msg_queue[which_port].erase(it); 00239 } 00240 00241 virtual bool has_msg_port(pmt::pmt_t which_port) { 00242 if(msg_queue.find(which_port) != msg_queue.end()) { 00243 return true; 00244 } 00245 if(pmt::dict_has_key(message_subscribers, which_port)) { 00246 return true; 00247 } 00248 return false; 00249 } 00250 00251 #ifdef GR_CTRLPORT 00252 /*! 00253 * \brief Add an RPC variable (get or set). 00254 * 00255 * Using controlport, we create new getters/setters and need to 00256 * store them. Each block has a vector to do this, and these never 00257 * need to be accessed again once they are registered with the RPC 00258 * backend. This function takes a 00259 * boost::shared_sptr<rpcbasic_base> so that when the block is 00260 * deleted, all RPC registered variables are cleaned up. 00261 * 00262 * \param s an rpcbasic_sptr of the new RPC variable register to store. 00263 */ 00264 void add_rpc_variable(rpcbasic_sptr s) 00265 { 00266 d_rpc_vars.push_back(s); 00267 } 00268 #endif /* GR_CTRLPORT */ 00269 00270 /*! 00271 * \brief Set up the RPC registered variables. 00272 * 00273 * This must be overloaded by a block that wants to use 00274 * controlport. This is where rpcbasic_register_{get,set} pointers 00275 * are created, which then get wrapped as shared pointers 00276 * (rpcbasic_sptr(...)) and stored using add_rpc_variable. 00277 */ 00278 virtual void setup_rpc() {}; 00279 00280 /*! 00281 * \brief Ask if this block has been registered to the RPC. 00282 * 00283 * We can only register a block once, so we use this to protect us 00284 * from calling it multiple times. 00285 */ 00286 bool is_rpc_set() { return d_rpc_set; } 00287 00288 /*! 00289 * \brief When the block is registered with the RPC, set this. 00290 */ 00291 void rpc_set() { d_rpc_set = true; } 00292 00293 /*! 00294 * \brief Confirm that ninputs and noutputs is an acceptable combination. 00295 * 00296 * \param ninputs number of input streams connected 00297 * \param noutputs number of output streams connected 00298 * 00299 * \returns true if this is a valid configuration for this block. 00300 * 00301 * This function is called by the runtime system whenever the 00302 * topology changes. Most classes do not need to override this. 00303 * This check is in addition to the constraints specified by the 00304 * input and output gr::io_signatures. 00305 */ 00306 virtual bool check_topology(int ninputs, int noutputs) { 00307 (void)ninputs; 00308 (void)noutputs; 00309 return true; 00310 } 00311 00312 /*! 00313 * \brief Set the callback that is fired when messages are available. 00314 * 00315 * \p msg_handler can be any kind of function pointer or function object 00316 * that has the signature: 00317 * <pre> 00318 * void msg_handler(pmt::pmt msg); 00319 * </pre> 00320 * 00321 * (You may want to use boost::bind to massage your callable into 00322 * the correct form. See gr::blocks::nop for an example that sets 00323 * up a class method as the callback.) 00324 * 00325 * Blocks that desire to handle messages must call this method in 00326 * their constructors to register the handler that will be invoked 00327 * when messages are available. 00328 * 00329 * If the block inherits from block, the runtime system will 00330 * ensure that msg_handler is called in a thread-safe manner, such 00331 * that work and msg_handler will never be called concurrently. 00332 * This allows msg_handler to update state variables without 00333 * having to worry about thread-safety issues with work, 00334 * general_work or another invocation of msg_handler. 00335 * 00336 * If the block inherits from hier_block2, the runtime system 00337 * will ensure that no reentrant calls are made to msg_handler. 00338 */ 00339 template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler) { 00340 if(msg_queue.find(which_port) == msg_queue.end()) { 00341 throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); 00342 } 00343 d_msg_handlers[which_port] = msg_handler_t(msg_handler); 00344 } 00345 00346 virtual void set_processor_affinity(const std::vector<int> &mask) 00347 { throw std::runtime_error("set_processor_affinity not overloaded in child class."); } 00348 00349 virtual void unset_processor_affinity() 00350 { throw std::runtime_error("unset_processor_affinity not overloaded in child class."); } 00351 00352 virtual std::vector<int> processor_affinity() 00353 { throw std::runtime_error("processor_affinity not overloaded in child class."); } 00354 }; 00355 00356 inline bool operator<(basic_block_sptr lhs, basic_block_sptr rhs) 00357 { 00358 return lhs->unique_id() < rhs->unique_id(); 00359 } 00360 00361 typedef std::vector<basic_block_sptr> basic_block_vector_t; 00362 typedef std::vector<basic_block_sptr>::iterator basic_block_viter_t; 00363 00364 GR_RUNTIME_API long basic_block_ncurrently_allocated(); 00365 00366 inline std::ostream &operator << (std::ostream &os, basic_block_sptr basic_block) 00367 { 00368 os << basic_block->name() << "(" << basic_block->unique_id() << ")"; 00369 return os; 00370 } 00371 00372 } /* namespace gr */ 00373 00374 #endif /* INCLUDED_GR_BASIC_BLOCK_H */