diff options
author | Tom Rondeau <tom@trondeau.com> | 2015-10-07 07:15:56 -0400 |
---|---|---|
committer | Tom Rondeau <tom@trondeau.com> | 2015-10-16 15:48:47 -0400 |
commit | 09b03871e81da2b09792a454ee3caf28bdad4c3c (patch) | |
tree | 577b2ca08026ce833963e7e494069e593b5a489f | |
parent | ce28185909272d2aebc0e8279a4ab2c3cca16c02 (diff) |
ctrlport: provides a ControlPort model that connects directly with a
block's message handler.
We still need to register this in setup_rpc, but there might be a way
to automate the registering of this during set_msg_handler if
ControlPort is enabled.
13 files changed, 348 insertions, 8 deletions
diff --git a/gnuradio-runtime/include/gnuradio/rpccallbackregister_base.h b/gnuradio-runtime/include/gnuradio/rpccallbackregister_base.h index 814749fe66..14e8772449 100644 --- a/gnuradio-runtime/include/gnuradio/rpccallbackregister_base.h +++ b/gnuradio-runtime/include/gnuradio/rpccallbackregister_base.h @@ -99,6 +99,7 @@ struct callbackregister_base typedef callback_t<gr::messages::msg_accepter, gr::messages::msg_accepter_sptr> configureCallback_t; typedef callback_t<gr::messages::msg_producer, gr::messages::msg_producer_sptr> queryCallback_t; + typedef callback_t<gr::messages::msg_accepter, gr::messages::msg_accepter_sptr> handlerCallback_t; callbackregister_base() {;} virtual ~callbackregister_base() {;} @@ -107,6 +108,8 @@ struct callbackregister_base virtual void unregisterConfigureCallback(const std::string &id) = 0; virtual void registerQueryCallback(const std::string &id, const queryCallback_t callback) = 0; virtual void unregisterQueryCallback(const std::string &id) = 0; + virtual void registerHandlerCallback(const std::string &id, const handlerCallback_t callback) = 0; + virtual void unregisterHandlerCallback(const std::string &id) = 0; }; #endif /* RPCCALLBACKREGISTER_BASE_H */ diff --git a/gnuradio-runtime/include/gnuradio/rpcregisterhelpers.h b/gnuradio-runtime/include/gnuradio/rpcregisterhelpers.h index adf868df7d..4405c90939 100644 --- a/gnuradio-runtime/include/gnuradio/rpcregisterhelpers.h +++ b/gnuradio-runtime/include/gnuradio/rpcregisterhelpers.h @@ -143,6 +143,49 @@ public: }; + +/********************************************************************* + * RPC Handler Base Classes + ********************************************************************/ + +/*! + *\brief Base class for registering a ControlPort Handler. Acts as + * a message acceptor. + */ +template<typename T> +class rpchandler_base + : public virtual gr::messages::msg_accepter +{ +public: + rpchandler_base(T* source, const char* handler) : + _source(source), _handler(handler) {;} + ~rpchandler_base() {;} + + void post(pmt::pmt_t which_port, pmt::pmt_t msg) { + _source->post(which_port, msg); + } + +protected: + T* _source; + const char* _handler; +}; + + +/*! + * \brief Templated parent class for registering a ControlPort Extractor. + */ +template<typename T> +class rpcbasic_handler : public virtual rpchandler_base<T> +{ +public: + rpcbasic_handler(T* source, const char* handler) : + rpchandler_base<T>(source, handler) + {;} +}; + + + + /********************************************************************* * RPC Specialized Extractors ********************************************************************/ @@ -1355,6 +1398,81 @@ public: }; +/*! + * \brief Registers a message handler function to post a message to a + * block's handler. + */ +template<typename T> +class rpcbasic_register_handler : public rpcbasic_base +{ +public: + + /*! + * \brief Adds the ability to pass a message over ControlPort. + * + * \details + * This makes any message handler function avialable over + * ControlPort. Since message handlers always take in a single PMT + * message input, this interface provides a very generic way of + * setting values in a block in a flowgraph. + * + * \param block_alias Alias of the block + * \param handler The name of the message port in the block + * \param units_ A string to describe what units to represent the variable with + * \param desc_ A string to describing the variable. + * \param minpriv_ The required minimum privilege level + * \param display_ The display mask + */ + rpcbasic_register_handler(const std::string& block_alias, + const char* handler, + const char* units_ = "", + const char* desc_ = "", + priv_lvl_t minpriv_ = RPC_PRIVLVL_MIN, + DisplayType display_ = DISPNULL) + { + d_units = units_; + d_desc = desc_; + d_minpriv = minpriv_; + d_display = display_; + d_object = dynamic_cast<T*>(global_block_registry.block_lookup(pmt::intern(block_alias)).get()); +#ifdef GR_RPCSERVER_ENABLED + callbackregister_base::handlerCallback_t + inserter(new rpcbasic_handler<T>(d_object, handler), + minpriv_, std::string(units_), display_, std::string(desc_), + 0, 0, 0); + std::ostringstream oss(std::ostringstream::out); + oss << block_alias << "::" << handler; + d_id = oss.str(); + //std::cerr << "REGISTERING GET: " << d_id << " " << desc_ << std::endl; + rpcmanager::get()->i()->registerHandlerCallback(d_id, inserter); +#endif + } + + ~rpcbasic_register_handler() + { +#ifdef GR_RPCSERVER_ENABLED + rpcmanager::get()->i()->unregisterHandlerCallback(d_id); +#endif + } + + std::string units() const { return d_units; } + std::string description() const { return d_desc; } + priv_lvl_t privilege_level() const { return d_minpriv; } + DisplayType default_display() const { return d_display; } + + void units(std::string u) { d_units = u; } + void description(std::string d) { d_desc = d; } + void privilege_level(priv_lvl_t p) { d_minpriv = p; } + void default_display(DisplayType d) { d_display = d; } + +private: + std::string d_id; + std::string d_units, d_desc; + priv_lvl_t d_minpriv; + DisplayType d_display; + T *d_object; +}; + #endif diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_aggregator.h b/gnuradio-runtime/include/gnuradio/rpcserver_aggregator.h index 98aae92b90..08426bb00c 100644 --- a/gnuradio-runtime/include/gnuradio/rpcserver_aggregator.h +++ b/gnuradio-runtime/include/gnuradio/rpcserver_aggregator.h @@ -40,6 +40,9 @@ public: void registerQueryCallback(const std::string &id, const queryCallback_t callback); void unregisterQueryCallback(const std::string &id); + void registerHandlerCallback(const std::string &id, const handlerCallback_t callback); + void unregisterHandlerCallback(const std::string &id); + void registerServer(rpcmanager_base::rpcserver_booter_base_sptr server); const std::string& type(); @@ -91,6 +94,32 @@ private: const std::string& id; }; + + + template<class T, typename Tcallback> + struct registerHandlerCallback_f: public std::unary_function<T,void> + { + registerHandlerCallback_f(const std::string &_id, const Tcallback _callback) + : id(_id), callback(_callback) + {;} + + void operator()(T& x) { x->i()->registerHandlerCallback(id, callback); } + const std::string& id; const Tcallback& callback; + }; + + template<class T, typename Tcallback> + struct unregisterHandlerCallback_f: public std::unary_function<T,void> + { + unregisterHandlerCallback_f(const std::string &_id) + : id(_id) + {;} + + void operator()(T& x) { x->i()->unregisterHandlerCallback(id); } + const std::string& id; + }; + + + const std::string d_type; typedef std::vector<rpcmanager_base::rpcserver_booter_base_sptr> rpcServerMap_t; std::vector<std::string> d_registeredServers; diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_base.h b/gnuradio-runtime/include/gnuradio/rpcserver_base.h index af0b9e762d..276dec5d1e 100644 --- a/gnuradio-runtime/include/gnuradio/rpcserver_base.h +++ b/gnuradio-runtime/include/gnuradio/rpcserver_base.h @@ -33,8 +33,13 @@ public: virtual void registerConfigureCallback(const std::string &id, const configureCallback_t callback) = 0; virtual void unregisterConfigureCallback(const std::string &id) = 0; + virtual void registerQueryCallback(const std::string &id, const queryCallback_t callback) = 0; virtual void unregisterQueryCallback(const std::string &id) = 0; + + virtual void registerHandlerCallback(const std::string &id, const handlerCallback_t callback) = 0; + virtual void unregisterHandlerCallback(const std::string &id) = 0; + virtual void setCurPrivLevel(const priv_lvl_t priv) { cur_priv = priv; } typedef boost::shared_ptr<rpcserver_base> rpcserver_base_sptr; diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h b/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h index 203be66e9a..dc7ad40161 100644 --- a/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h +++ b/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h @@ -38,7 +38,9 @@ #define S_(x) S(x) #define S__LINE__ S_(__LINE__) -class rpcserver_thrift : public virtual rpcserver_base, public GNURadio::ControlPortIf +class rpcserver_thrift + : public virtual rpcserver_base, + public GNURadio::ControlPortIf { public: rpcserver_thrift(); @@ -52,6 +54,10 @@ public: const queryCallback_t callback); void unregisterQueryCallback(const std::string &id); + void registerHandlerCallback(const std::string &id, + const handlerCallback_t callback); + void unregisterHandlerCallback(const std::string &id); + void setKnobs(const GNURadio::KnobMap&); void getKnobs(GNURadio::KnobMap&, const GNURadio::KnobIDList&); @@ -59,6 +65,35 @@ public: const GNURadio::KnobIDList&); void properties(GNURadio::KnobPropMap&, const GNURadio::KnobIDList& knobs); + + /*! + * \brief Call this to post a message to the \p port for the block + * identified by \p alias. + * + * The message, \p msg, is passed as a serialized PMT that is then + * passed to the message handler function identified by \p port to + * the block identified by \p alias. The \p alias and \p port + * values are passed as serialized PMT symbols (see + * pmt::intern). The message is whatever PMT format is appropriate + * for the message handler function. + * + * To use this function, the message handler function must have + * been registered (most likely in setup_rpc) in the block during + * construction using rpcbasic_register_handler. + * + * \param alias The alias of the block, which is used to map to the + * real block through the global_block_registry. Passed in + * as a serialized PMT symbol. + * \param port The name of the message port. Passed in as a + * serialized PMT symbol. + * \param msg The actual message to pass to \p port. This is a + * serialized PMT where the PMT is whatever form appropriate + * for the message handler function. + */ + void postMessage(const std::string& alias, + const std::string& port, + const std::string& msg); + virtual void shutdown(); private: @@ -70,6 +105,28 @@ public: typedef std::map<std::string, queryCallback_t> QueryCallbackMap_t; QueryCallbackMap_t d_getcallbackmap; + typedef std::map<std::string, handlerCallback_t> HandlerCallbackMap_t; + HandlerCallbackMap_t d_handlercallbackmap; + + /*! + * \brief Manages calling the callback function for a message handler posting. + */ + void + set_h(const handlerCallback_t &_handlerCallback, + const priv_lvl_t &_cur_priv, + pmt::pmt_t port, pmt::pmt_t msg) + { + if(cur_priv <= _handlerCallback.priv) { + _handlerCallback.callback->post(port, msg); + } + else { + std::cerr << "Message " << _handlerCallback.description << " requires PRIVLVL <= " + << _handlerCallback.priv << " to set, currently at: " + << cur_priv << std::endl; + } + } + + template<typename T, typename TMap> struct set_f : public std::unary_function<T,void> { @@ -87,7 +144,7 @@ public: (*iter->second.callback).post(pmt::PMT_NIL, rpcpmtconverter::To_PMT::instance(p.second)); } else { - std::cout << "Key " << p.first << " requires PRIVLVL <= " + std::cerr << "Key " << p.first << " requires PRIVLVL <= " << iter->second.priv << " to set, currently at: " << cur_priv << std::endl; } @@ -116,7 +173,7 @@ public: outknobs[p] = rpcpmtconverter::from_pmt((*iter->second.callback).retrieve()); } else { - std::cout << "Key " << iter->first << " requires PRIVLVL: <= " + std::cerr << "Key " << iter->first << " requires PRIVLVL: <= " << iter->second.priv << " to get, currently at: " << cur_priv << std::endl; } @@ -124,7 +181,7 @@ public: else { std::stringstream ss; ss << "Ctrlport Key called with unregistered key (" << p << ")\n"; - std::cout << ss.str(); + std::cerr << ss.str(); throw apache::thrift::TApplicationException(__FILE__ " " S__LINE__); } } @@ -147,7 +204,7 @@ public: outknobs[p.first] = rpcpmtconverter::from_pmt(p.second.callback->retrieve()); } else { - std::cout << "Key " << p.first << " requires PRIVLVL <= " + std::cerr << "Key " << p.first << " requires PRIVLVL <= " << p.second.priv << " to get, currently at: " << cur_priv << std::endl; } @@ -182,7 +239,7 @@ public: outknobs[p.first] = prop; } else { - std::cout << "Key " << p.first << " requires PRIVLVL <= " + std::cerr << "Key " << p.first << " requires PRIVLVL <= " << p.second.priv << " to get, currently at: " << cur_priv << std::endl; } @@ -215,7 +272,7 @@ public: outknobs[p] = prop; } else { - std::cout << "Key " << iter->first << " requires PRIVLVL: <= " + std::cerr << "Key " << iter->first << " requires PRIVLVL: <= " << iter->second.priv << " to get, currently at: " << cur_priv << std::endl; } } diff --git a/gnuradio-runtime/include/gnuradio/thrift_server_template.h b/gnuradio-runtime/include/gnuradio/thrift_server_template.h index e2d6f63a69..632a902360 100644 --- a/gnuradio-runtime/include/gnuradio/thrift_server_template.h +++ b/gnuradio-runtime/include/gnuradio/thrift_server_template.h @@ -25,7 +25,6 @@ #include <gnuradio/prefs.h> #include <gnuradio/logger.h> -#include <gnuradio/rpcserver_thrift.h> #include <gnuradio/thrift_application_base.h> #include <iostream> diff --git a/gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc b/gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc index 3ff553af69..e81a899626 100644 --- a/gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc +++ b/gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc @@ -74,6 +74,25 @@ rpcserver_aggregator::unregisterQueryCallback(const std::string &id) unregisterQueryCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, queryCallback_t>(id)); } + + +void +rpcserver_aggregator::registerHandlerCallback(const std::string &id, + const handlerCallback_t callback) +{ + std::for_each(d_serverlist.begin(), d_serverlist.end(), + registerHandlerCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, handlerCallback_t>(id, callback)); +} + +void +rpcserver_aggregator::unregisterHandlerCallback(const std::string &id) +{ + std::for_each(d_serverlist.begin(), d_serverlist.end(), + unregisterHandlerCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, handlerCallback_t>(id)); +} + + + void rpcserver_aggregator::registerServer(rpcmanager_base::rpcserver_booter_base_sptr server) { diff --git a/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift b/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift index 023e9fd788..432fb5d76c 100644 --- a/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift +++ b/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift @@ -105,5 +105,6 @@ service ControlPort { KnobMap getKnobs(1:KnobIDList knobs); KnobMap getRe(1:KnobIDList knobs); KnobPropMap properties(1:KnobIDList knobs); + void postMessage(1:string blk_alias, 2:string port, 3:string msg); void shutdown(); } diff --git a/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc b/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc index e33fea457a..6b912bbfae 100644 --- a/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc +++ b/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc @@ -124,6 +124,51 @@ rpcserver_thrift::unregisterQueryCallback(const std::string &id) d_getcallbackmap.erase(iter); } + + +void +rpcserver_thrift::registerHandlerCallback(const std::string &id, + const handlerCallback_t callback) +{ + boost::mutex::scoped_lock lock(d_callback_map_lock); + { + HandlerCallbackMap_t::const_iterator iter(d_handlercallbackmap.find(id)); + if(iter != d_handlercallbackmap.end()) { + std::stringstream s; + s << "rpcserver_thrift:: rpcserver_thrift ERROR registering handler, already registered: " + << id << std::endl; + throw std::runtime_error(s.str().c_str()); + } + } + + if(DEBUG) { + std::cerr << "rpcserver_thrift registering handler: " << id << std::endl; + } + d_handlercallbackmap.insert(HandlerCallbackMap_t::value_type(id, callback)); +} + +void +rpcserver_thrift::unregisterHandlerCallback(const std::string &id) +{ + boost::mutex::scoped_lock lock(d_callback_map_lock); + HandlerCallbackMap_t::iterator iter(d_handlercallbackmap.find(id)); + if(iter == d_handlercallbackmap.end()) { + std::stringstream s; + s << "rpcserver_thrift:: rpcserver_thrift ERROR unregistering handler, registered: " + << id << std::endl; + throw std::runtime_error(s.str().c_str()); + } + + if(DEBUG) { + std::cerr << "rpcserver_thrift unregistering handler: " << id << std::endl; + } + + d_handlercallbackmap.erase(iter); +} + + + + void rpcserver_thrift::setKnobs(const GNURadio::KnobMap& knobs) { @@ -193,6 +238,35 @@ rpcserver_thrift::properties(GNURadio::KnobPropMap& _return, } } + +void +rpcserver_thrift::postMessage(const std::string& alias, + const std::string& port, + const std::string& msg) +{ + // alias and port are received as serialized PMT strings and need to + // be deserialized into PMTs and then the actual info from there. + // The actual message (msg) is also received as a serialized PMT. We + // just need to get the PMT itself out of this to pass to the set_h + // function for handling the message post. + + boost::mutex::scoped_lock lock(d_callback_map_lock); + + pmt::pmt_t alias_pmt = pmt::deserialize_str(alias); + pmt::pmt_t port_pmt = pmt::deserialize_str(port); + pmt::pmt_t msg_pmt = pmt::deserialize_str(msg); + std::string alias_str = pmt::symbol_to_string(alias_pmt); + std::string port_str = pmt::symbol_to_string(port_pmt); + std::string iface = alias_str + "::" + port_str; + + HandlerCallbackMap_t::iterator itr = d_handlercallbackmap.begin(); + for(; itr != d_handlercallbackmap.end(); itr++) { + if(iface == (*itr).first) { + set_h((*itr).second, cur_priv, port_pmt, msg_pmt); + } + } +} + void rpcserver_thrift::shutdown() { if (DEBUG) { diff --git a/gnuradio-runtime/python/gnuradio/ctrlport/RPCConnection.py b/gnuradio-runtime/python/gnuradio/ctrlport/RPCConnection.py index e14cc0cea7..1b129534c9 100644 --- a/gnuradio-runtime/python/gnuradio/ctrlport/RPCConnection.py +++ b/gnuradio-runtime/python/gnuradio/ctrlport/RPCConnection.py @@ -105,6 +105,9 @@ class RPCConnection(object): def getRe(self,*args): raise exceptions.NotImplementedError() + def postMessage(self,*args): + raise exceptions.NotImplementedError() + def setKnobs(self,*args): raise exceptions.NotImplementedError() diff --git a/gnuradio-runtime/python/gnuradio/ctrlport/RPCConnectionThrift.py b/gnuradio-runtime/python/gnuradio/ctrlport/RPCConnectionThrift.py index 9a2a302af5..522c74117b 100644 --- a/gnuradio-runtime/python/gnuradio/ctrlport/RPCConnectionThrift.py +++ b/gnuradio-runtime/python/gnuradio/ctrlport/RPCConnectionThrift.py @@ -27,6 +27,7 @@ from thrift.protocol import TBinaryProtocol from gnuradio.ctrlport.GNURadio import ControlPort from gnuradio.ctrlport import RPCConnection from gnuradio import gr +import pmt import sys class ThriftRadioClient: @@ -196,6 +197,22 @@ class RPCConnectionThrift(RPCConnection.RPCConnection): def shutdown(self): self.thriftclient.radio.shutdown() + def postMessage(self, blk_alias, port, msg): + ''' + blk_alias: the alias of the block we are posting the message + to; must have an open message port named 'port'. + Provide as a string. + port: The name of the message port we are sending the message to. + Provide as a string. + msg: The actual message. Provide this as a PMT of the form + right for the message port. + The alias and port names are converted to PMT symbols and + serialized. The msg is already a PMT and so just serialized. + ''' + self.thriftclient.radio.postMessage(pmt.serialize_str(pmt.intern(blk_alias)), + pmt.serialize_str(pmt.intern(port)), + pmt.serialize_str(msg)); + def printProperties(self, props): info = "" info += "Item:\t\t{0}\n".format(props.description) diff --git a/gr-blocks/lib/copy_impl.cc b/gr-blocks/lib/copy_impl.cc index 02848369d1..acac576075 100644 --- a/gr-blocks/lib/copy_impl.cc +++ b/gr-blocks/lib/copy_impl.cc @@ -99,5 +99,18 @@ namespace gr { return n; } + + void + copy_impl::setup_rpc() + { +#ifdef GR_CTRLPORT + add_rpc_variable( + rpcbasic_sptr(new rpcbasic_register_handler<copy>( + alias(), "en", + "", "Enable", + RPC_PRIVLVL_MIN, DISPNULL))); +#endif /* GR_CTRLPORT */ + } + } /* namespace blocks */ } /* namespace gr */ diff --git a/gr-blocks/lib/copy_impl.h b/gr-blocks/lib/copy_impl.h index 925efb2153..ac7126749b 100644 --- a/gr-blocks/lib/copy_impl.h +++ b/gr-blocks/lib/copy_impl.h @@ -43,6 +43,8 @@ namespace gr { void handle_enable(pmt::pmt_t msg); + void setup_rpc(); + void set_enabled(bool enable) { d_enabled = enable; } bool enabled() const { return d_enabled;} |