GNU Radio 3.7.1 C++ API
basic_block.h
Go to the documentation of this file.
00001 /* -*- c++ -*- */
00002 /*
00003  * Copyright 2006,2008,2009,2011,2013 Free Software Foundation, Inc.
00004  *
00005  * This file is part of GNU Radio
00006  *
00007  * GNU Radio is free software; you can redistribute it and/or modify
00008  * it under the terms of the GNU General Public License as published by
00009  * the Free Software Foundation; either version 3, or (at your option)
00010  * any later version.
00011  *
00012  * GNU Radio is distributed in the hope that it will be useful,
00013  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00014  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00015  * GNU General Public License for more details.
00016  *
00017  * You should have received a copy of the GNU General Public License
00018  * along with GNU Radio; see the file COPYING.  If not, write to
00019  * the Free Software Foundation, Inc., 51 Franklin Street,
00020  * Boston, MA 02110-1301, USA.
00021  */
00022 
00023 #ifndef INCLUDED_GR_BASIC_BLOCK_H
00024 #define INCLUDED_GR_BASIC_BLOCK_H
00025 
00026 #include <gnuradio/api.h>
00027 #include <gnuradio/sptr_magic.h>
00028 #include <gnuradio/msg_accepter.h>
00029 #include <gnuradio/runtime_types.h>
00030 #include <gnuradio/io_signature.h>
00031 #include <gnuradio/thread/thread.h>
00032 #include <boost/enable_shared_from_this.hpp>
00033 #include <boost/function.hpp>
00034 #include <boost/foreach.hpp>
00035 #include <boost/thread/condition_variable.hpp>
00036 #include <iostream>
00037 #include <string>
00038 #include <deque>
00039 #include <map>
00040 
00041 #ifdef GR_CTRLPORT
00042 #include <gnuradio/rpcregisterhelpers.h>
00043 #endif
00044 
00045 namespace gr {
00046 
00047   /*!
00048    * \brief The abstract base class for all signal processing blocks.
00049    * \ingroup internal
00050    *
00051    * Basic blocks are the bare abstraction of an entity that has a
00052    * name, a set of inputs and outputs, and a message queue.  These
00053    * are never instantiated directly; rather, this is the abstract
00054    * parent class of both gr_hier_block, which is a recursive
00055    * container, and block, which implements actual signal
00056    * processing functions.
00057    */
00058   class GR_RUNTIME_API basic_block : public msg_accepter,
00059                                      public boost::enable_shared_from_this<basic_block>
00060   {
00061     typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
00062   
00063   private:
00064     //msg_handler_t d_msg_handler;
00065     typedef std::map<pmt::pmt_t , msg_handler_t, pmt::comperator> d_msg_handlers_t;
00066     d_msg_handlers_t d_msg_handlers;
00067   
00068     typedef std::deque<pmt::pmt_t> msg_queue_t;
00069     typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comperator> msg_queue_map_t;
00070     typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comperator>::iterator msg_queue_map_itr;
00071     std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::comperator> msg_queue_ready;
00072   
00073     gr::thread::mutex mutex;          //< protects all vars
00074   
00075   protected:
00076     friend class flowgraph;
00077     friend class flat_flowgraph; // TODO: will be redundant
00078     friend class tpb_thread_body;
00079   
00080     enum vcolor { WHITE, GREY, BLACK };
00081   
00082     std::string       d_name;
00083     gr::io_signature::sptr d_input_signature;
00084     gr::io_signature::sptr d_output_signature;
00085     long              d_unique_id;
00086     long              d_symbolic_id;
00087     std::string       d_symbol_name;
00088     std::string       d_symbol_alias;
00089     vcolor            d_color;
00090     bool              d_rpc_set;
00091 
00092     msg_queue_map_t msg_queue;
00093     std::vector<boost::any> d_rpc_vars; // container for all RPC variables
00094   
00095     basic_block(void) {} // allows pure virtual interface sub-classes
00096   
00097     //! Protected constructor prevents instantiation by non-derived classes
00098     basic_block(const std::string &name,
00099                 gr::io_signature::sptr input_signature,
00100                 gr::io_signature::sptr output_signature);
00101   
00102     //! may only be called during constructor
00103     void set_input_signature(gr::io_signature::sptr iosig) {
00104       d_input_signature = iosig;
00105     }
00106   
00107     //! may only be called during constructor
00108     void set_output_signature(gr::io_signature::sptr iosig) {
00109       d_output_signature = iosig;
00110     }
00111   
00112     /*!
00113      * \brief Allow the flowgraph to set for sorting and partitioning
00114      */
00115     void set_color(vcolor color) { d_color = color; }
00116     vcolor color() const { return d_color; }
00117 
00118     /*!
00119      * \brief Tests if there is a handler attached to port \p which_port
00120      */
00121     virtual bool has_msg_handler(pmt::pmt_t which_port) {
00122       return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
00123     }
00124 
00125     /*
00126      * This function is called by the runtime system to dispatch messages.
00127      *
00128      * The thread-safety guarantees mentioned in set_msg_handler are
00129      * implemented by the callers of this method.
00130      */
00131     virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
00132     {
00133       // AA Update this
00134       if(has_msg_handler(which_port)) {  // Is there a handler?
00135         d_msg_handlers[which_port](msg); // Yes, invoke it.
00136       }
00137     }
00138   
00139     // Message passing interface
00140     pmt::pmt_t message_subscribers;
00141   
00142   public:
00143     virtual ~basic_block();
00144     long unique_id() const { return d_unique_id; }
00145     long symbolic_id() const { return d_symbolic_id; }
00146     std::string name() const { return d_name; }
00147     std::string symbol_name() const { return d_symbol_name; }
00148     gr::io_signature::sptr input_signature() const  { return d_input_signature; }
00149     gr::io_signature::sptr output_signature() const { return d_output_signature; }
00150     basic_block_sptr to_basic_block(); // Needed for Python type coercion
00151     bool alias_set() { return !d_symbol_alias.empty(); }
00152     std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
00153     pmt::pmt_t alias_pmt(){ return pmt::intern(alias()); }
00154     void set_block_alias(std::string name);
00155   
00156     // ** Message passing interface **
00157     void message_port_register_in(pmt::pmt_t port_id);
00158     void message_port_register_out(pmt::pmt_t port_id);
00159     void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
00160     void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
00161     void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
00162   
00163     virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier\n"; return false; }
00164     virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier_in\n"; return false; }
00165     virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier_out\n"; return false; }
00166   
00167     /*!
00168      * \brief Get input message port names.
00169      *
00170      * Returns the available input message ports for a block. The
00171      * return object is a PMT vector that is filled with PMT symbols.
00172      */
00173     pmt::pmt_t message_ports_in();
00174   
00175     /*!
00176      * \brief Get output message port names.
00177      *
00178      * Returns the available output message ports for a block. The
00179      * return object is a PMT vector that is filled with PMT symbols.
00180      */
00181     pmt::pmt_t message_ports_out();
00182   
00183     /*!
00184      * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
00185      */
00186     void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
00187   
00188     //! is the queue empty?
00189     bool empty_p(pmt::pmt_t which_port) { 
00190       if(msg_queue.find(which_port) == msg_queue.end())
00191         throw std::runtime_error("port does not exist!");
00192       return msg_queue[which_port].empty(); 
00193     }
00194     bool empty_p() { 
00195       bool rv = true;
00196       BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
00197         rv &= msg_queue[i.first].empty();
00198       }
00199       return rv;
00200     }
00201 
00202     //! are all msg ports with handlers empty?
00203     bool empty_handled_p(pmt::pmt_t which_port){
00204         return (empty_p(which_port) || !has_msg_handler(which_port));
00205     }
00206     bool empty_handled_p() { 
00207       bool rv = true;
00208       BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
00209         rv &= empty_handled_p(i.first);
00210       }
00211       return rv;
00212     }
00213 
00214     //! How many messages in the queue?
00215     size_t nmsgs(pmt::pmt_t which_port) { 
00216       if(msg_queue.find(which_port) == msg_queue.end())
00217         throw std::runtime_error("port does not exist!");
00218       return msg_queue[which_port].size(); 
00219     }
00220   
00221     //| Acquires and release the mutex
00222     void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
00223     /*!
00224      * \returns returns pmt at head of queue or pmt::pmt_t() if empty.
00225      */
00226     pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
00227   
00228     /*!
00229      * \returns returns pmt at head of queue or pmt::pmt_t() if empty.
00230      */
00231     pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
00232   
00233     msg_queue_t::iterator get_iterator(pmt::pmt_t which_port) {
00234       return msg_queue[which_port].begin();
00235     }
00236 
00237     void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it) {
00238       msg_queue[which_port].erase(it);
00239     }
00240   
00241     virtual bool has_msg_port(pmt::pmt_t which_port) {
00242       if(msg_queue.find(which_port) != msg_queue.end()) {
00243         return true;
00244       }
00245       if(pmt::dict_has_key(message_subscribers, which_port)) {
00246         return true;
00247       }
00248       return false;
00249     }
00250 
00251 #ifdef GR_CTRLPORT
00252     /*!
00253      * \brief Add an RPC variable (get or set).
00254      *
00255      * Using controlport, we create new getters/setters and need to
00256      * store them. Each block has a vector to do this, and these never
00257      * need to be accessed again once they are registered with the RPC
00258      * backend. This function takes a
00259      * boost::shared_sptr<rpcbasic_base> so that when the block is
00260      * deleted, all RPC registered variables are cleaned up.
00261      *
00262      * \param s an rpcbasic_sptr of the new RPC variable register to store.
00263      */
00264     void add_rpc_variable(rpcbasic_sptr s)
00265     {
00266       d_rpc_vars.push_back(s);
00267     }
00268 #endif /* GR_CTRLPORT */
00269 
00270     /*!
00271      * \brief Set up the RPC registered variables.
00272      *
00273      * This must be overloaded by a block that wants to use
00274      * controlport. This is where rpcbasic_register_{get,set} pointers
00275      * are created, which then get wrapped as shared pointers
00276      * (rpcbasic_sptr(...)) and stored using add_rpc_variable.
00277      */
00278     virtual void setup_rpc() {};
00279 
00280     /*!
00281      * \brief Ask if this block has been registered to the RPC.
00282      *
00283      * We can only register a block once, so we use this to protect us
00284      * from calling it multiple times.
00285      */
00286     bool is_rpc_set() { return d_rpc_set; }
00287 
00288     /*!
00289      * \brief When the block is registered with the RPC, set this.
00290      */
00291     void rpc_set() { d_rpc_set = true; }
00292   
00293     /*!
00294      * \brief Confirm that ninputs and noutputs is an acceptable combination.
00295      *
00296      * \param ninputs   number of input streams connected
00297      * \param noutputs  number of output streams connected
00298      *
00299      * \returns true if this is a valid configuration for this block.
00300      *
00301      * This function is called by the runtime system whenever the
00302      * topology changes. Most classes do not need to override this.
00303      * This check is in addition to the constraints specified by the
00304      * input and output gr::io_signatures.
00305      */
00306     virtual bool check_topology(int ninputs, int noutputs) { 
00307       (void)ninputs;
00308       (void)noutputs;
00309       return true;
00310     }
00311   
00312     /*!
00313      * \brief Set the callback that is fired when messages are available.
00314      *
00315      * \p msg_handler can be any kind of function pointer or function object
00316      * that has the signature:
00317      * <pre>
00318      *    void msg_handler(pmt::pmt msg);
00319      * </pre>
00320      *
00321      * (You may want to use boost::bind to massage your callable into
00322      * the correct form.  See gr::blocks::nop for an example that sets
00323      * up a class method as the callback.)
00324      *
00325      * Blocks that desire to handle messages must call this method in
00326      * their constructors to register the handler that will be invoked
00327      * when messages are available.
00328      *
00329      * If the block inherits from block, the runtime system will
00330      * ensure that msg_handler is called in a thread-safe manner, such
00331      * that work and msg_handler will never be called concurrently.
00332      * This allows msg_handler to update state variables without
00333      * having to worry about thread-safety issues with work,
00334      * general_work or another invocation of msg_handler.
00335      *
00336      * If the block inherits from hier_block2, the runtime system
00337      * will ensure that no reentrant calls are made to msg_handler.
00338      */
00339     template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler) {
00340       if(msg_queue.find(which_port) == msg_queue.end()) {
00341         throw std::runtime_error("attempt to set_msg_handler() on bad input message port!");
00342       }
00343       d_msg_handlers[which_port] = msg_handler_t(msg_handler);
00344     }
00345 
00346     virtual void set_processor_affinity(const std::vector<int> &mask)
00347     { throw std::runtime_error("set_processor_affinity not overloaded in child class."); }
00348 
00349     virtual void unset_processor_affinity()
00350     { throw std::runtime_error("unset_processor_affinity not overloaded in child class."); }
00351 
00352     virtual std::vector<int> processor_affinity()
00353     { throw std::runtime_error("processor_affinity not overloaded in child class."); }
00354   };
00355 
00356   inline bool operator<(basic_block_sptr lhs, basic_block_sptr rhs)
00357   {
00358     return lhs->unique_id() < rhs->unique_id();
00359   }
00360 
00361   typedef std::vector<basic_block_sptr> basic_block_vector_t;
00362   typedef std::vector<basic_block_sptr>::iterator basic_block_viter_t;
00363 
00364   GR_RUNTIME_API long basic_block_ncurrently_allocated();
00365 
00366   inline std::ostream &operator << (std::ostream &os, basic_block_sptr basic_block)
00367   {
00368     os << basic_block->name() << "(" << basic_block->unique_id() << ")";
00369     return os;
00370   }
00371 
00372 } /* namespace gr */
00373 
00374 #endif /* INCLUDED_GR_BASIC_BLOCK_H */