diff options
Diffstat (limited to 'gnuradio-runtime/include/gnuradio/basic_block.h')
-rw-r--r-- | gnuradio-runtime/include/gnuradio/basic_block.h | 354 |
1 files changed, 354 insertions, 0 deletions
diff --git a/gnuradio-runtime/include/gnuradio/basic_block.h b/gnuradio-runtime/include/gnuradio/basic_block.h new file mode 100644 index 0000000000..be385465d1 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/basic_block.h @@ -0,0 +1,354 @@ +/* -*- c++ -*- */ +/* + * Copyright 2006,2008,2009,2011,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_BASIC_BLOCK_H +#define INCLUDED_GR_BASIC_BLOCK_H + +#include <gnuradio/api.h> +#include <gnuradio/sptr_magic.h> +#include <gnuradio/msg_accepter.h> +#include <gnuradio/runtime_types.h> +#include <gnuradio/io_signature.h> +#include <gnuradio/thread/thread.h> +#include <boost/enable_shared_from_this.hpp> +#include <boost/function.hpp> +#include <boost/foreach.hpp> +#include <boost/thread/condition_variable.hpp> +#include <iostream> +#include <string> +#include <deque> +#include <map> + +#ifdef GR_CTRLPORT +#include <gnuradio/rpcregisterhelpers.h> +#endif + +namespace gr { + + /*! + * \brief The abstract base class for all signal processing blocks. + * \ingroup internal + * + * Basic blocks are the bare abstraction of an entity that has a + * name, a set of inputs and outputs, and a message queue. These + * are never instantiated directly; rather, this is the abstract + * parent class of both gr_hier_block, which is a recursive + * container, and block, which implements actual signal + * processing functions. + */ + class GR_RUNTIME_API basic_block : public msg_accepter, + public boost::enable_shared_from_this<basic_block> + { + typedef boost::function<void(pmt::pmt_t)> msg_handler_t; + + private: + //msg_handler_t d_msg_handler; + typedef std::map<pmt::pmt_t , msg_handler_t, pmt::comperator> d_msg_handlers_t; + d_msg_handlers_t d_msg_handlers; + + typedef std::deque<pmt::pmt_t> msg_queue_t; + typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comperator> msg_queue_map_t; + typedef std::map<pmt::pmt_t, msg_queue_t, pmt::comperator>::iterator msg_queue_map_itr; + std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::comperator> msg_queue_ready; + + gr::thread::mutex mutex; //< protects all vars + + protected: + friend class flowgraph; + friend class flat_flowgraph; // TODO: will be redundant + friend class tpb_thread_body; + + enum vcolor { WHITE, GREY, BLACK }; + + std::string d_name; + gr::io_signature::sptr d_input_signature; + gr::io_signature::sptr d_output_signature; + long d_unique_id; + long d_symbolic_id; + std::string d_symbol_name; + std::string d_symbol_alias; + vcolor d_color; + bool d_rpc_set; + + msg_queue_map_t msg_queue; + std::vector<boost::any> d_rpc_vars; // container for all RPC variables + + basic_block(void) {} // allows pure virtual interface sub-classes + + //! Protected constructor prevents instantiation by non-derived classes + basic_block(const std::string &name, + gr::io_signature::sptr input_signature, + gr::io_signature::sptr output_signature); + + //! may only be called during constructor + void set_input_signature(gr::io_signature::sptr iosig) { + d_input_signature = iosig; + } + + //! may only be called during constructor + void set_output_signature(gr::io_signature::sptr iosig) { + d_output_signature = iosig; + } + + /*! + * \brief Allow the flowgraph to set for sorting and partitioning + */ + void set_color(vcolor color) { d_color = color; } + vcolor color() const { return d_color; } + + /*! + * \brief Tests if there is a handler attached to port \p which_port + */ + bool has_msg_handler(pmt::pmt_t which_port) { + return (d_msg_handlers.find(which_port) != d_msg_handlers.end()); + } + + /* + * This function is called by the runtime system to dispatch messages. + * + * The thread-safety guarantees mentioned in set_msg_handler are + * implemented by the callers of this method. + */ + virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) + { + // AA Update this + if(has_msg_handler(which_port)) { // Is there a handler? + d_msg_handlers[which_port](msg); // Yes, invoke it. + } + } + + // Message passing interface + pmt::pmt_t message_subscribers; + + public: + virtual ~basic_block(); + long unique_id() const { return d_unique_id; } + long symbolic_id() const { return d_symbolic_id; } + std::string name() const { return d_name; } + std::string symbol_name() const { return d_symbol_name; } + gr::io_signature::sptr input_signature() const { return d_input_signature; } + gr::io_signature::sptr output_signature() const { return d_output_signature; } + basic_block_sptr to_basic_block(); // Needed for Python type coercion + bool alias_set() { return !d_symbol_alias.empty(); } + std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); } + pmt::pmt_t alias_pmt(){ return pmt::intern(alias()); } + void set_block_alias(std::string name); + + // ** Message passing interface ** + void message_port_register_in(pmt::pmt_t port_id); + void message_port_register_out(pmt::pmt_t port_id); + void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg); + void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target); + void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target); + + virtual bool message_port_is_hier(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier\n"; return false; } + virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier_in\n"; return false; } + virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { (void) port_id; std::cout << "is_hier_out\n"; return false; } + + /*! + * \brief Get input message port names. + * + * Returns the available input message ports for a block. The + * return object is a PMT vector that is filled with PMT symbols. + */ + pmt::pmt_t message_ports_in(); + + /*! + * \brief Get output message port names. + * + * Returns the available output message ports for a block. The + * return object is a PMT vector that is filled with PMT symbols. + */ + pmt::pmt_t message_ports_out(); + + /*! + * Accept msg, place in queue, arrange for thread to be awakened if it's not already. + */ + void _post(pmt::pmt_t which_port, pmt::pmt_t msg); + + //! is the queue empty? + //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); } + bool empty_p(pmt::pmt_t which_port) { + if(msg_queue.find(which_port) == msg_queue.end()) + throw std::runtime_error("port does not exist!"); + return msg_queue[which_port].empty(); + } + bool empty_p() { + bool rv = true; + BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) { + rv &= msg_queue[i.first].empty(); + } + return rv; + } + + //! How many messages in the queue? + size_t nmsgs(pmt::pmt_t which_port) { + if(msg_queue.find(which_port) == msg_queue.end()) + throw std::runtime_error("port does not exist!"); + return msg_queue[which_port].size(); + } + + //| Acquires and release the mutex + void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg); + /*! + * \returns returns pmt at head of queue or pmt::pmt_t() if empty. + */ + pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port); + + /*! + * \returns returns pmt at head of queue or pmt::pmt_t() if empty. + */ + pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port); + + msg_queue_t::iterator get_iterator(pmt::pmt_t which_port) { + return msg_queue[which_port].begin(); + } + + void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it) { + msg_queue[which_port].erase(it); + } + + virtual bool has_msg_port(pmt::pmt_t which_port) { + if(msg_queue.find(which_port) != msg_queue.end()) { + return true; + } + if(pmt::dict_has_key(message_subscribers, which_port)) { + return true; + } + return false; + } + +#ifdef GR_CTRLPORT + /*! + * \brief Add an RPC variable (get or set). + * + * Using controlport, we create new getters/setters and need to + * store them. Each block has a vector to do this, and these never + * need to be accessed again once they are registered with the RPC + * backend. This function takes a + * boost::shared_sptr<rpcbasic_base> so that when the block is + * deleted, all RPC registered variables are cleaned up. + * + * \param s an rpcbasic_sptr of the new RPC variable register to store. + */ + void add_rpc_variable(rpcbasic_sptr s) + { + d_rpc_vars.push_back(s); + } +#endif /* GR_CTRLPORT */ + + /*! + * \brief Set up the RPC registered variables. + * + * This must be overloaded by a block that wants to use + * controlport. This is where rpcbasic_register_{get,set} pointers + * are created, which then get wrapped as shared pointers + * (rpcbasic_sptr(...)) and stored using add_rpc_variable. + */ + virtual void setup_rpc() {}; + + /*! + * \brief Ask if this block has been registered to the RPC. + * + * We can only register a block once, so we use this to protect us + * from calling it multiple times. + */ + bool is_rpc_set() { return d_rpc_set; } + + /*! + * \brief When the block is registered with the RPC, set this. + */ + void rpc_set() { d_rpc_set = true; } + + /*! + * \brief Confirm that ninputs and noutputs is an acceptable combination. + * + * \param ninputs number of input streams connected + * \param noutputs number of output streams connected + * + * \returns true if this is a valid configuration for this block. + * + * This function is called by the runtime system whenever the + * topology changes. Most classes do not need to override this. + * This check is in addition to the constraints specified by the + * input and output gr::io_signatures. + */ + virtual bool check_topology(int ninputs, int noutputs) { + (void)ninputs; + (void)noutputs; + return true; + } + + /*! + * \brief Set the callback that is fired when messages are available. + * + * \p msg_handler can be any kind of function pointer or function object + * that has the signature: + * <pre> + * void msg_handler(pmt::pmt msg); + * </pre> + * + * (You may want to use boost::bind to massage your callable into + * the correct form. See gr::blocks::nop for an example that sets + * up a class method as the callback.) + * + * Blocks that desire to handle messages must call this method in + * their constructors to register the handler that will be invoked + * when messages are available. + * + * If the block inherits from block, the runtime system will + * ensure that msg_handler is called in a thread-safe manner, such + * that work and msg_handler will never be called concurrently. + * This allows msg_handler to update state variables without + * having to worry about thread-safety issues with work, + * general_work or another invocation of msg_handler. + * + * If the block inherits from hier_block2, the runtime system + * will ensure that no reentrant calls are made to msg_handler. + */ + template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler) { + if(msg_queue.find(which_port) == msg_queue.end()) { + throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); + } + d_msg_handlers[which_port] = msg_handler_t(msg_handler); + } + }; + + inline bool operator<(basic_block_sptr lhs, basic_block_sptr rhs) + { + return lhs->unique_id() < rhs->unique_id(); + } + + typedef std::vector<basic_block_sptr> basic_block_vector_t; + typedef std::vector<basic_block_sptr>::iterator basic_block_viter_t; + + GR_RUNTIME_API long basic_block_ncurrently_allocated(); + + inline std::ostream &operator << (std::ostream &os, basic_block_sptr basic_block) + { + os << basic_block->name() << "(" << basic_block->unique_id() << ")"; + return os; + } + +} /* namespace gr */ + +#endif /* INCLUDED_GR_BASIC_BLOCK_H */ |