summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2013-07-22 06:38:23 -0700
committerJohnathan Corgan <johnathan@corganlabs.com>2013-07-22 07:02:00 -0700
commitfd1f0b7d27f0112f0ec72fd5530e9fb3c607fdec (patch)
tree775c82e007b4e68a96e4f3aafa5267dbe431d5d9
parent8144572532b8bfa37af01e1264a334d2a898ea8d (diff)
runtime: fix asynch messages delivery when handler not defined
-rw-r--r--gnuradio-runtime/include/gnuradio/basic_block.h15
-rw-r--r--gnuradio-runtime/include/gnuradio/block_gateway.h5
-rw-r--r--gnuradio-runtime/lib/tpb_thread_body.cc46
3 files changed, 48 insertions, 18 deletions
diff --git a/gnuradio-runtime/include/gnuradio/basic_block.h b/gnuradio-runtime/include/gnuradio/basic_block.h
index be385465d1..30ae2ca0f0 100644
--- a/gnuradio-runtime/include/gnuradio/basic_block.h
+++ b/gnuradio-runtime/include/gnuradio/basic_block.h
@@ -118,7 +118,7 @@ namespace gr {
/*!
* \brief Tests if there is a handler attached to port \p which_port
*/
- bool has_msg_handler(pmt::pmt_t which_port) {
+ virtual bool has_msg_handler(pmt::pmt_t which_port) {
return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
}
@@ -186,7 +186,6 @@ namespace gr {
void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
//! is the queue empty?
- //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
bool empty_p(pmt::pmt_t which_port) {
if(msg_queue.find(which_port) == msg_queue.end())
throw std::runtime_error("port does not exist!");
@@ -200,6 +199,18 @@ namespace gr {
return rv;
}
+ //! are all msg ports with handlers empty?
+ bool empty_handled_p(pmt::pmt_t which_port){
+ return (empty_p(which_port) || !has_msg_handler(which_port));
+ }
+ bool empty_handled_p() {
+ bool rv = true;
+ BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
+ rv &= empty_handled_p(i.first);
+ }
+ return rv;
+ }
+
//! How many messages in the queue?
size_t nmsgs(pmt::pmt_t which_port) {
if(msg_queue.find(which_port) == msg_queue.end())
diff --git a/gnuradio-runtime/include/gnuradio/block_gateway.h b/gnuradio-runtime/include/gnuradio/block_gateway.h
index 0f328de2e5..c2d09de00d 100644
--- a/gnuradio-runtime/include/gnuradio/block_gateway.h
+++ b/gnuradio-runtime/include/gnuradio/block_gateway.h
@@ -253,6 +253,11 @@ namespace gr {
typedef std::map<pmt::pmt_t, feval_p *, pmt::comperator> msg_handlers_feval_t;
msg_handlers_feval_t d_msg_handlers_feval;
+ bool has_msg_handler(pmt::pmt_t which_port)
+ {
+ return (d_msg_handlers_feval.find(which_port) != d_msg_handlers_feval.end());
+ }
+
void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
{
// Is there a handler?
diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc
index f9e543b0e5..f6f09a9c3e 100644
--- a/gnuradio-runtime/lib/tpb_thread_body.cc
+++ b/gnuradio-runtime/lib/tpb_thread_body.cc
@@ -61,8 +61,6 @@ namespace gr {
boost::this_thread::interruption_point();
// handle any queued up messages
- //BOOST_FOREACH( pmt::pmt_t port, block->msg_queue.keys() )
-
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
// Check if we have a message handler attached before getting
// any messages. This is mostly a protection for the unknown
@@ -108,15 +106,23 @@ namespace gr {
while(!d->d_tpb.input_changed) {
// wait for input or message
- while(!d->d_tpb.input_changed && block->empty_p())
+ while(!d->d_tpb.input_changed && block->empty_handled_p())
d->d_tpb.input_cond.wait(guard);
// handle all pending messages
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
- while((msg = block->delete_head_nowait(i.first))) {
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(i.first, msg);
- guard.lock();
+ if(block->has_msg_handler(i.first)) {
+ while((msg = block->delete_head_nowait(i.first))) {
+ guard.unlock(); // release lock while processing msg
+ block->dispatch_msg(i.first, msg);
+ guard.lock();
+ }
+ }
+ else {
+ // leave msg in queue if no handler is defined
+ // start dropping if we have too many
+ if(block->nmsgs(i.first) > max_nmsgs)
+ msg = block->delete_head_nowait(i.first);
}
}
if (d->done()) {
@@ -131,23 +137,31 @@ namespace gr {
gr::thread::scoped_lock guard(d->d_tpb.mutex);
while(!d->d_tpb.output_changed) {
// wait for output room or message
- while(!d->d_tpb.output_changed && block->empty_p())
+ while(!d->d_tpb.output_changed && block->empty_handled_p())
d->d_tpb.output_cond.wait(guard);
// handle all pending messages
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
- while((msg = block->delete_head_nowait(i.first))) {
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(i.first,msg);
- guard.lock();
- }
- }
- }
+ if(block->has_msg_handler(i.first)) {
+ while((msg = block->delete_head_nowait(i.first))) {
+ guard.unlock(); // release lock while processing msg
+ block->dispatch_msg(i.first, msg);
+ guard.lock();
+ }
+ }
+ else {
+ // leave msg in queue if no handler is defined
+ // start dropping if we have too many
+ if(block->nmsgs(i.first) > max_nmsgs)
+ msg = block->delete_head_nowait(i.first);
+ }
+ }
+ }
}
break;
default:
- assert(0);
+ throw std::runtime_error("possible memory corruption in scheduler");
}
}
}