summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/tpb_thread_body.cc
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2013-07-22 06:38:23 -0700
committerJohnathan Corgan <johnathan@corganlabs.com>2013-07-22 07:02:00 -0700
commitfd1f0b7d27f0112f0ec72fd5530e9fb3c607fdec (patch)
tree775c82e007b4e68a96e4f3aafa5267dbe431d5d9 /gnuradio-runtime/lib/tpb_thread_body.cc
parent8144572532b8bfa37af01e1264a334d2a898ea8d (diff)
runtime: fix asynch messages delivery when handler not defined
Diffstat (limited to 'gnuradio-runtime/lib/tpb_thread_body.cc')
-rw-r--r--gnuradio-runtime/lib/tpb_thread_body.cc46
1 files changed, 30 insertions, 16 deletions
diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc
index f9e543b0e5..f6f09a9c3e 100644
--- a/gnuradio-runtime/lib/tpb_thread_body.cc
+++ b/gnuradio-runtime/lib/tpb_thread_body.cc
@@ -61,8 +61,6 @@ namespace gr {
boost::this_thread::interruption_point();
// handle any queued up messages
- //BOOST_FOREACH( pmt::pmt_t port, block->msg_queue.keys() )
-
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
// Check if we have a message handler attached before getting
// any messages. This is mostly a protection for the unknown
@@ -108,15 +106,23 @@ namespace gr {
while(!d->d_tpb.input_changed) {
// wait for input or message
- while(!d->d_tpb.input_changed && block->empty_p())
+ while(!d->d_tpb.input_changed && block->empty_handled_p())
d->d_tpb.input_cond.wait(guard);
// handle all pending messages
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
- while((msg = block->delete_head_nowait(i.first))) {
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(i.first, msg);
- guard.lock();
+ if(block->has_msg_handler(i.first)) {
+ while((msg = block->delete_head_nowait(i.first))) {
+ guard.unlock(); // release lock while processing msg
+ block->dispatch_msg(i.first, msg);
+ guard.lock();
+ }
+ }
+ else {
+ // leave msg in queue if no handler is defined
+ // start dropping if we have too many
+ if(block->nmsgs(i.first) > max_nmsgs)
+ msg = block->delete_head_nowait(i.first);
}
}
if (d->done()) {
@@ -131,23 +137,31 @@ namespace gr {
gr::thread::scoped_lock guard(d->d_tpb.mutex);
while(!d->d_tpb.output_changed) {
// wait for output room or message
- while(!d->d_tpb.output_changed && block->empty_p())
+ while(!d->d_tpb.output_changed && block->empty_handled_p())
d->d_tpb.output_cond.wait(guard);
// handle all pending messages
BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) {
- while((msg = block->delete_head_nowait(i.first))) {
- guard.unlock(); // release lock while processing msg
- block->dispatch_msg(i.first,msg);
- guard.lock();
- }
- }
- }
+ if(block->has_msg_handler(i.first)) {
+ while((msg = block->delete_head_nowait(i.first))) {
+ guard.unlock(); // release lock while processing msg
+ block->dispatch_msg(i.first, msg);
+ guard.lock();
+ }
+ }
+ else {
+ // leave msg in queue if no handler is defined
+ // start dropping if we have too many
+ if(block->nmsgs(i.first) > max_nmsgs)
+ msg = block->delete_head_nowait(i.first);
+ }
+ }
+ }
}
break;
default:
- assert(0);
+ throw std::runtime_error("possible memory corruption in scheduler");
}
}
}