path: root/gnuradio-runtime
diff options
authorTom Rondeau <>2015-10-07 07:15:56 -0400
committerTom Rondeau <>2015-10-16 15:48:47 -0400
commit09b03871e81da2b09792a454ee3caf28bdad4c3c (patch)
tree577b2ca08026ce833963e7e494069e593b5a489f /gnuradio-runtime
parentce28185909272d2aebc0e8279a4ab2c3cca16c02 (diff)
ctrlport: provides a ControlPort model that connects directly with a
block's message handler. We still need to register this in setup_rpc, but there might be a way to automate the registering of this during set_msg_handler if ControlPort is enabled.
Diffstat (limited to 'gnuradio-runtime')
11 files changed, 333 insertions, 8 deletions
diff --git a/gnuradio-runtime/include/gnuradio/rpccallbackregister_base.h b/gnuradio-runtime/include/gnuradio/rpccallbackregister_base.h
index 814749fe66..14e8772449 100644
--- a/gnuradio-runtime/include/gnuradio/rpccallbackregister_base.h
+++ b/gnuradio-runtime/include/gnuradio/rpccallbackregister_base.h
@@ -99,6 +99,7 @@ struct callbackregister_base
typedef callback_t<gr::messages::msg_accepter, gr::messages::msg_accepter_sptr> configureCallback_t;
typedef callback_t<gr::messages::msg_producer, gr::messages::msg_producer_sptr> queryCallback_t;
+ typedef callback_t<gr::messages::msg_accepter, gr::messages::msg_accepter_sptr> handlerCallback_t;
callbackregister_base() {;}
virtual ~callbackregister_base() {;}
@@ -107,6 +108,8 @@ struct callbackregister_base
virtual void unregisterConfigureCallback(const std::string &id) = 0;
virtual void registerQueryCallback(const std::string &id, const queryCallback_t callback) = 0;
virtual void unregisterQueryCallback(const std::string &id) = 0;
+ virtual void registerHandlerCallback(const std::string &id, const handlerCallback_t callback) = 0;
+ virtual void unregisterHandlerCallback(const std::string &id) = 0;
diff --git a/gnuradio-runtime/include/gnuradio/rpcregisterhelpers.h b/gnuradio-runtime/include/gnuradio/rpcregisterhelpers.h
index adf868df7d..4405c90939 100644
--- a/gnuradio-runtime/include/gnuradio/rpcregisterhelpers.h
+++ b/gnuradio-runtime/include/gnuradio/rpcregisterhelpers.h
@@ -143,6 +143,49 @@ public:
+ * RPC Handler Base Classes
+ ********************************************************************/
+ *\brief Base class for registering a ControlPort Handler. Acts as
+ * a message acceptor.
+ */
+template<typename T>
+class rpchandler_base
+ : public virtual gr::messages::msg_accepter
+ rpchandler_base(T* source, const char* handler) :
+ _source(source), _handler(handler) {;}
+ ~rpchandler_base() {;}
+ void post(pmt::pmt_t which_port, pmt::pmt_t msg) {
+ _source->post(which_port, msg);
+ }
+ T* _source;
+ const char* _handler;
+ * \brief Templated parent class for registering a ControlPort Extractor.
+ */
+template<typename T>
+class rpcbasic_handler : public virtual rpchandler_base<T>
+ rpcbasic_handler(T* source, const char* handler) :
+ rpchandler_base<T>(source, handler)
+ {;}
* RPC Specialized Extractors
@@ -1355,6 +1398,81 @@ public:
+ * \brief Registers a message handler function to post a message to a
+ * block's handler.
+ */
+template<typename T>
+class rpcbasic_register_handler : public rpcbasic_base
+ /*!
+ * \brief Adds the ability to pass a message over ControlPort.
+ *
+ * \details
+ * This makes any message handler function avialable over
+ * ControlPort. Since message handlers always take in a single PMT
+ * message input, this interface provides a very generic way of
+ * setting values in a block in a flowgraph.
+ *
+ * \param block_alias Alias of the block
+ * \param handler The name of the message port in the block
+ * \param units_ A string to describe what units to represent the variable with
+ * \param desc_ A string to describing the variable.
+ * \param minpriv_ The required minimum privilege level
+ * \param display_ The display mask
+ */
+ rpcbasic_register_handler(const std::string& block_alias,
+ const char* handler,
+ const char* units_ = "",
+ const char* desc_ = "",
+ priv_lvl_t minpriv_ = RPC_PRIVLVL_MIN,
+ DisplayType display_ = DISPNULL)
+ {
+ d_units = units_;
+ d_desc = desc_;
+ d_minpriv = minpriv_;
+ d_display = display_;
+ d_object = dynamic_cast<T*>(global_block_registry.block_lookup(pmt::intern(block_alias)).get());
+ callbackregister_base::handlerCallback_t
+ inserter(new rpcbasic_handler<T>(d_object, handler),
+ minpriv_, std::string(units_), display_, std::string(desc_),
+ 0, 0, 0);
+ std::ostringstream oss(std::ostringstream::out);
+ oss << block_alias << "::" << handler;
+ d_id = oss.str();
+ //std::cerr << "REGISTERING GET: " << d_id << " " << desc_ << std::endl;
+ rpcmanager::get()->i()->registerHandlerCallback(d_id, inserter);
+ }
+ ~rpcbasic_register_handler()
+ {
+ rpcmanager::get()->i()->unregisterHandlerCallback(d_id);
+ }
+ std::string units() const { return d_units; }
+ std::string description() const { return d_desc; }
+ priv_lvl_t privilege_level() const { return d_minpriv; }
+ DisplayType default_display() const { return d_display; }
+ void units(std::string u) { d_units = u; }
+ void description(std::string d) { d_desc = d; }
+ void privilege_level(priv_lvl_t p) { d_minpriv = p; }
+ void default_display(DisplayType d) { d_display = d; }
+ std::string d_id;
+ std::string d_units, d_desc;
+ priv_lvl_t d_minpriv;
+ DisplayType d_display;
+ T *d_object;
diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_aggregator.h b/gnuradio-runtime/include/gnuradio/rpcserver_aggregator.h
index 98aae92b90..08426bb00c 100644
--- a/gnuradio-runtime/include/gnuradio/rpcserver_aggregator.h
+++ b/gnuradio-runtime/include/gnuradio/rpcserver_aggregator.h
@@ -40,6 +40,9 @@ public:
void registerQueryCallback(const std::string &id, const queryCallback_t callback);
void unregisterQueryCallback(const std::string &id);
+ void registerHandlerCallback(const std::string &id, const handlerCallback_t callback);
+ void unregisterHandlerCallback(const std::string &id);
void registerServer(rpcmanager_base::rpcserver_booter_base_sptr server);
const std::string& type();
@@ -91,6 +94,32 @@ private:
const std::string& id;
+ template<class T, typename Tcallback>
+ struct registerHandlerCallback_f: public std::unary_function<T,void>
+ {
+ registerHandlerCallback_f(const std::string &_id, const Tcallback _callback)
+ : id(_id), callback(_callback)
+ {;}
+ void operator()(T& x) { x->i()->registerHandlerCallback(id, callback); }
+ const std::string& id; const Tcallback& callback;
+ };
+ template<class T, typename Tcallback>
+ struct unregisterHandlerCallback_f: public std::unary_function<T,void>
+ {
+ unregisterHandlerCallback_f(const std::string &_id)
+ : id(_id)
+ {;}
+ void operator()(T& x) { x->i()->unregisterHandlerCallback(id); }
+ const std::string& id;
+ };
const std::string d_type;
typedef std::vector<rpcmanager_base::rpcserver_booter_base_sptr> rpcServerMap_t;
std::vector<std::string> d_registeredServers;
diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_base.h b/gnuradio-runtime/include/gnuradio/rpcserver_base.h
index af0b9e762d..276dec5d1e 100644
--- a/gnuradio-runtime/include/gnuradio/rpcserver_base.h
+++ b/gnuradio-runtime/include/gnuradio/rpcserver_base.h
@@ -33,8 +33,13 @@ public:
virtual void registerConfigureCallback(const std::string &id, const configureCallback_t callback) = 0;
virtual void unregisterConfigureCallback(const std::string &id) = 0;
virtual void registerQueryCallback(const std::string &id, const queryCallback_t callback) = 0;
virtual void unregisterQueryCallback(const std::string &id) = 0;
+ virtual void registerHandlerCallback(const std::string &id, const handlerCallback_t callback) = 0;
+ virtual void unregisterHandlerCallback(const std::string &id) = 0;
virtual void setCurPrivLevel(const priv_lvl_t priv) { cur_priv = priv; }
typedef boost::shared_ptr<rpcserver_base> rpcserver_base_sptr;
diff --git a/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h b/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
index 203be66e9a..dc7ad40161 100644
--- a/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
+++ b/gnuradio-runtime/include/gnuradio/rpcserver_thrift.h
@@ -38,7 +38,9 @@
#define S_(x) S(x)
#define S__LINE__ S_(__LINE__)
-class rpcserver_thrift : public virtual rpcserver_base, public GNURadio::ControlPortIf
+class rpcserver_thrift
+ : public virtual rpcserver_base,
+ public GNURadio::ControlPortIf
@@ -52,6 +54,10 @@ public:
const queryCallback_t callback);
void unregisterQueryCallback(const std::string &id);
+ void registerHandlerCallback(const std::string &id,
+ const handlerCallback_t callback);
+ void unregisterHandlerCallback(const std::string &id);
void setKnobs(const GNURadio::KnobMap&);
void getKnobs(GNURadio::KnobMap&,
const GNURadio::KnobIDList&);
@@ -59,6 +65,35 @@ public:
const GNURadio::KnobIDList&);
void properties(GNURadio::KnobPropMap&,
const GNURadio::KnobIDList& knobs);
+ /*!
+ * \brief Call this to post a message to the \p port for the block
+ * identified by \p alias.
+ *
+ * The message, \p msg, is passed as a serialized PMT that is then
+ * passed to the message handler function identified by \p port to
+ * the block identified by \p alias. The \p alias and \p port
+ * values are passed as serialized PMT symbols (see
+ * pmt::intern). The message is whatever PMT format is appropriate
+ * for the message handler function.
+ *
+ * To use this function, the message handler function must have
+ * been registered (most likely in setup_rpc) in the block during
+ * construction using rpcbasic_register_handler.
+ *
+ * \param alias The alias of the block, which is used to map to the
+ * real block through the global_block_registry. Passed in
+ * as a serialized PMT symbol.
+ * \param port The name of the message port. Passed in as a
+ * serialized PMT symbol.
+ * \param msg The actual message to pass to \p port. This is a
+ * serialized PMT where the PMT is whatever form appropriate
+ * for the message handler function.
+ */
+ void postMessage(const std::string& alias,
+ const std::string& port,
+ const std::string& msg);
virtual void shutdown();
@@ -70,6 +105,28 @@ public:
typedef std::map<std::string, queryCallback_t> QueryCallbackMap_t;
QueryCallbackMap_t d_getcallbackmap;
+ typedef std::map<std::string, handlerCallback_t> HandlerCallbackMap_t;
+ HandlerCallbackMap_t d_handlercallbackmap;
+ /*!
+ * \brief Manages calling the callback function for a message handler posting.
+ */
+ void
+ set_h(const handlerCallback_t &_handlerCallback,
+ const priv_lvl_t &_cur_priv,
+ pmt::pmt_t port, pmt::pmt_t msg)
+ {
+ if(cur_priv <= _handlerCallback.priv) {
+ _handlerCallback.callback->post(port, msg);
+ }
+ else {
+ std::cerr << "Message " << _handlerCallback.description << " requires PRIVLVL <= "
+ << _handlerCallback.priv << " to set, currently at: "
+ << cur_priv << std::endl;
+ }
+ }
template<typename T, typename TMap> struct set_f
: public std::unary_function<T,void>
@@ -87,7 +144,7 @@ public:
(*iter->second.callback).post(pmt::PMT_NIL, rpcpmtconverter::To_PMT::instance(p.second));
else {
- std::cout << "Key " << p.first << " requires PRIVLVL <= "
+ std::cerr << "Key " << p.first << " requires PRIVLVL <= "
<< iter->second.priv << " to set, currently at: "
<< cur_priv << std::endl;
@@ -116,7 +173,7 @@ public:
outknobs[p] = rpcpmtconverter::from_pmt((*iter->second.callback).retrieve());
else {
- std::cout << "Key " << iter->first << " requires PRIVLVL: <= "
+ std::cerr << "Key " << iter->first << " requires PRIVLVL: <= "
<< iter->second.priv << " to get, currently at: "
<< cur_priv << std::endl;
@@ -124,7 +181,7 @@ public:
else {
std::stringstream ss;
ss << "Ctrlport Key called with unregistered key (" << p << ")\n";
- std::cout << ss.str();
+ std::cerr << ss.str();
throw apache::thrift::TApplicationException(__FILE__ " " S__LINE__);
@@ -147,7 +204,7 @@ public:
outknobs[p.first] = rpcpmtconverter::from_pmt(p.second.callback->retrieve());
else {
- std::cout << "Key " << p.first << " requires PRIVLVL <= "
+ std::cerr << "Key " << p.first << " requires PRIVLVL <= "
<< p.second.priv << " to get, currently at: "
<< cur_priv << std::endl;
@@ -182,7 +239,7 @@ public:
outknobs[p.first] = prop;
else {
- std::cout << "Key " << p.first << " requires PRIVLVL <= "
+ std::cerr << "Key " << p.first << " requires PRIVLVL <= "
<< p.second.priv << " to get, currently at: "
<< cur_priv << std::endl;
@@ -215,7 +272,7 @@ public:
outknobs[p] = prop;
else {
- std::cout << "Key " << iter->first << " requires PRIVLVL: <= "
+ std::cerr << "Key " << iter->first << " requires PRIVLVL: <= "
<< iter->second.priv << " to get, currently at: " << cur_priv << std::endl;
diff --git a/gnuradio-runtime/include/gnuradio/thrift_server_template.h b/gnuradio-runtime/include/gnuradio/thrift_server_template.h
index e2d6f63a69..632a902360 100644
--- a/gnuradio-runtime/include/gnuradio/thrift_server_template.h
+++ b/gnuradio-runtime/include/gnuradio/thrift_server_template.h
@@ -25,7 +25,6 @@
#include <gnuradio/prefs.h>
#include <gnuradio/logger.h>
-#include <gnuradio/rpcserver_thrift.h>
#include <gnuradio/thrift_application_base.h>
#include <iostream>
diff --git a/gnuradio-runtime/lib/controlport/ b/gnuradio-runtime/lib/controlport/
index 3ff553af69..e81a899626 100644
--- a/gnuradio-runtime/lib/controlport/
+++ b/gnuradio-runtime/lib/controlport/
@@ -74,6 +74,25 @@ rpcserver_aggregator::unregisterQueryCallback(const std::string &id)
unregisterQueryCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, queryCallback_t>(id));
+rpcserver_aggregator::registerHandlerCallback(const std::string &id,
+ const handlerCallback_t callback)
+ std::for_each(d_serverlist.begin(), d_serverlist.end(),
+ registerHandlerCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, handlerCallback_t>(id, callback));
+rpcserver_aggregator::unregisterHandlerCallback(const std::string &id)
+ std::for_each(d_serverlist.begin(), d_serverlist.end(),
+ unregisterHandlerCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, handlerCallback_t>(id));
rpcserver_aggregator::registerServer(rpcmanager_base::rpcserver_booter_base_sptr server)
diff --git a/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift b/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift
index 023e9fd788..432fb5d76c 100644
--- a/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift
+++ b/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift
@@ -105,5 +105,6 @@ service ControlPort {
KnobMap getKnobs(1:KnobIDList knobs);
KnobMap getRe(1:KnobIDList knobs);
KnobPropMap properties(1:KnobIDList knobs);
+ void postMessage(1:string blk_alias, 2:string port, 3:string msg);
void shutdown();
diff --git a/gnuradio-runtime/lib/controlport/thrift/ b/gnuradio-runtime/lib/controlport/thrift/
index e33fea457a..6b912bbfae 100644
--- a/gnuradio-runtime/lib/controlport/thrift/
+++ b/gnuradio-runtime/lib/controlport/thrift/
@@ -124,6 +124,51 @@ rpcserver_thrift::unregisterQueryCallback(const std::string &id)
+rpcserver_thrift::registerHandlerCallback(const std::string &id,
+ const handlerCallback_t callback)
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
+ {
+ HandlerCallbackMap_t::const_iterator iter(d_handlercallbackmap.find(id));
+ if(iter != d_handlercallbackmap.end()) {
+ std::stringstream s;
+ s << "rpcserver_thrift:: rpcserver_thrift ERROR registering handler, already registered: "
+ << id << std::endl;
+ throw std::runtime_error(s.str().c_str());
+ }
+ }
+ if(DEBUG) {
+ std::cerr << "rpcserver_thrift registering handler: " << id << std::endl;
+ }
+ d_handlercallbackmap.insert(HandlerCallbackMap_t::value_type(id, callback));
+rpcserver_thrift::unregisterHandlerCallback(const std::string &id)
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
+ HandlerCallbackMap_t::iterator iter(d_handlercallbackmap.find(id));
+ if(iter == d_handlercallbackmap.end()) {
+ std::stringstream s;
+ s << "rpcserver_thrift:: rpcserver_thrift ERROR unregistering handler, registered: "
+ << id << std::endl;
+ throw std::runtime_error(s.str().c_str());
+ }
+ if(DEBUG) {
+ std::cerr << "rpcserver_thrift unregistering handler: " << id << std::endl;
+ }
+ d_handlercallbackmap.erase(iter);
rpcserver_thrift::setKnobs(const GNURadio::KnobMap& knobs)
@@ -193,6 +238,35 @@ rpcserver_thrift::properties(GNURadio::KnobPropMap& _return,
+rpcserver_thrift::postMessage(const std::string& alias,
+ const std::string& port,
+ const std::string& msg)
+ // alias and port are received as serialized PMT strings and need to
+ // be deserialized into PMTs and then the actual info from there.
+ // The actual message (msg) is also received as a serialized PMT. We
+ // just need to get the PMT itself out of this to pass to the set_h
+ // function for handling the message post.
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
+ pmt::pmt_t alias_pmt = pmt::deserialize_str(alias);
+ pmt::pmt_t port_pmt = pmt::deserialize_str(port);
+ pmt::pmt_t msg_pmt = pmt::deserialize_str(msg);
+ std::string alias_str = pmt::symbol_to_string(alias_pmt);
+ std::string port_str = pmt::symbol_to_string(port_pmt);
+ std::string iface = alias_str + "::" + port_str;
+ HandlerCallbackMap_t::iterator itr = d_handlercallbackmap.begin();
+ for(; itr != d_handlercallbackmap.end(); itr++) {
+ if(iface == (*itr).first) {
+ set_h((*itr).second, cur_priv, port_pmt, msg_pmt);
+ }
+ }
rpcserver_thrift::shutdown() {
if (DEBUG) {
diff --git a/gnuradio-runtime/python/gnuradio/ctrlport/ b/gnuradio-runtime/python/gnuradio/ctrlport/
index e14cc0cea7..1b129534c9 100644
--- a/gnuradio-runtime/python/gnuradio/ctrlport/
+++ b/gnuradio-runtime/python/gnuradio/ctrlport/
@@ -105,6 +105,9 @@ class RPCConnection(object):
def getRe(self,*args):
raise exceptions.NotImplementedError()
+ def postMessage(self,*args):
+ raise exceptions.NotImplementedError()
def setKnobs(self,*args):
raise exceptions.NotImplementedError()
diff --git a/gnuradio-runtime/python/gnuradio/ctrlport/ b/gnuradio-runtime/python/gnuradio/ctrlport/
index 9a2a302af5..522c74117b 100644
--- a/gnuradio-runtime/python/gnuradio/ctrlport/
+++ b/gnuradio-runtime/python/gnuradio/ctrlport/
@@ -27,6 +27,7 @@ from thrift.protocol import TBinaryProtocol
from gnuradio.ctrlport.GNURadio import ControlPort
from gnuradio.ctrlport import RPCConnection
from gnuradio import gr
+import pmt
import sys
class ThriftRadioClient:
@@ -196,6 +197,22 @@ class RPCConnectionThrift(RPCConnection.RPCConnection):
def shutdown(self):
+ def postMessage(self, blk_alias, port, msg):
+ '''
+ blk_alias: the alias of the block we are posting the message
+ to; must have an open message port named 'port'.
+ Provide as a string.
+ port: The name of the message port we are sending the message to.
+ Provide as a string.
+ msg: The actual message. Provide this as a PMT of the form
+ right for the message port.
+ The alias and port names are converted to PMT symbols and
+ serialized. The msg is already a PMT and so just serialized.
+ '''
+ pmt.serialize_str(pmt.intern(port)),
+ pmt.serialize_str(msg));
def printProperties(self, props):
info = ""
info += "Item:\t\t{0}\n".format(props.description)