summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/controlport
diff options
context:
space:
mode:
authorTom Rondeau <tom@trondeau.com>2015-10-07 07:15:56 -0400
committerTom Rondeau <tom@trondeau.com>2015-10-16 15:48:47 -0400
commit09b03871e81da2b09792a454ee3caf28bdad4c3c (patch)
tree577b2ca08026ce833963e7e494069e593b5a489f /gnuradio-runtime/lib/controlport
parentce28185909272d2aebc0e8279a4ab2c3cca16c02 (diff)
ctrlport: provides a ControlPort model that connects directly with a
block's message handler. We still need to register this in setup_rpc, but there might be a way to automate the registering of this during set_msg_handler if ControlPort is enabled.
Diffstat (limited to 'gnuradio-runtime/lib/controlport')
-rw-r--r--gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc19
-rw-r--r--gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift1
-rw-r--r--gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc74
3 files changed, 94 insertions, 0 deletions
diff --git a/gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc b/gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc
index 3ff553af69..e81a899626 100644
--- a/gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc
+++ b/gnuradio-runtime/lib/controlport/rpcserver_aggregator.cc
@@ -74,6 +74,25 @@ rpcserver_aggregator::unregisterQueryCallback(const std::string &id)
unregisterQueryCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, queryCallback_t>(id));
}
+
+
+void
+rpcserver_aggregator::registerHandlerCallback(const std::string &id,
+ const handlerCallback_t callback)
+{
+ std::for_each(d_serverlist.begin(), d_serverlist.end(),
+ registerHandlerCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, handlerCallback_t>(id, callback));
+}
+
+void
+rpcserver_aggregator::unregisterHandlerCallback(const std::string &id)
+{
+ std::for_each(d_serverlist.begin(), d_serverlist.end(),
+ unregisterHandlerCallback_f<rpcmanager_base::rpcserver_booter_base_sptr, handlerCallback_t>(id));
+}
+
+
+
void
rpcserver_aggregator::registerServer(rpcmanager_base::rpcserver_booter_base_sptr server)
{
diff --git a/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift b/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift
index 023e9fd788..432fb5d76c 100644
--- a/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift
+++ b/gnuradio-runtime/lib/controlport/thrift/gnuradio.thrift
@@ -105,5 +105,6 @@ service ControlPort {
KnobMap getKnobs(1:KnobIDList knobs);
KnobMap getRe(1:KnobIDList knobs);
KnobPropMap properties(1:KnobIDList knobs);
+ void postMessage(1:string blk_alias, 2:string port, 3:string msg);
void shutdown();
}
diff --git a/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc b/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
index e33fea457a..6b912bbfae 100644
--- a/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
+++ b/gnuradio-runtime/lib/controlport/thrift/rpcserver_thrift.cc
@@ -124,6 +124,51 @@ rpcserver_thrift::unregisterQueryCallback(const std::string &id)
d_getcallbackmap.erase(iter);
}
+
+
+void
+rpcserver_thrift::registerHandlerCallback(const std::string &id,
+ const handlerCallback_t callback)
+{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
+ {
+ HandlerCallbackMap_t::const_iterator iter(d_handlercallbackmap.find(id));
+ if(iter != d_handlercallbackmap.end()) {
+ std::stringstream s;
+ s << "rpcserver_thrift:: rpcserver_thrift ERROR registering handler, already registered: "
+ << id << std::endl;
+ throw std::runtime_error(s.str().c_str());
+ }
+ }
+
+ if(DEBUG) {
+ std::cerr << "rpcserver_thrift registering handler: " << id << std::endl;
+ }
+ d_handlercallbackmap.insert(HandlerCallbackMap_t::value_type(id, callback));
+}
+
+void
+rpcserver_thrift::unregisterHandlerCallback(const std::string &id)
+{
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
+ HandlerCallbackMap_t::iterator iter(d_handlercallbackmap.find(id));
+ if(iter == d_handlercallbackmap.end()) {
+ std::stringstream s;
+ s << "rpcserver_thrift:: rpcserver_thrift ERROR unregistering handler, registered: "
+ << id << std::endl;
+ throw std::runtime_error(s.str().c_str());
+ }
+
+ if(DEBUG) {
+ std::cerr << "rpcserver_thrift unregistering handler: " << id << std::endl;
+ }
+
+ d_handlercallbackmap.erase(iter);
+}
+
+
+
+
void
rpcserver_thrift::setKnobs(const GNURadio::KnobMap& knobs)
{
@@ -193,6 +238,35 @@ rpcserver_thrift::properties(GNURadio::KnobPropMap& _return,
}
}
+
+void
+rpcserver_thrift::postMessage(const std::string& alias,
+ const std::string& port,
+ const std::string& msg)
+{
+ // alias and port are received as serialized PMT strings and need to
+ // be deserialized into PMTs and then the actual info from there.
+ // The actual message (msg) is also received as a serialized PMT. We
+ // just need to get the PMT itself out of this to pass to the set_h
+ // function for handling the message post.
+
+ boost::mutex::scoped_lock lock(d_callback_map_lock);
+
+ pmt::pmt_t alias_pmt = pmt::deserialize_str(alias);
+ pmt::pmt_t port_pmt = pmt::deserialize_str(port);
+ pmt::pmt_t msg_pmt = pmt::deserialize_str(msg);
+ std::string alias_str = pmt::symbol_to_string(alias_pmt);
+ std::string port_str = pmt::symbol_to_string(port_pmt);
+ std::string iface = alias_str + "::" + port_str;
+
+ HandlerCallbackMap_t::iterator itr = d_handlercallbackmap.begin();
+ for(; itr != d_handlercallbackmap.end(); itr++) {
+ if(iface == (*itr).first) {
+ set_h((*itr).second, cur_priv, port_pmt, msg_pmt);
+ }
+ }
+}
+
void
rpcserver_thrift::shutdown() {
if (DEBUG) {