diff options
author | Tim O'Shea <tim.oshea753@gmail.com> | 2013-07-22 06:38:23 -0700 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2013-07-22 07:02:00 -0700 |
commit | fd1f0b7d27f0112f0ec72fd5530e9fb3c607fdec (patch) | |
tree | 775c82e007b4e68a96e4f3aafa5267dbe431d5d9 /gnuradio-runtime/lib/tpb_thread_body.cc | |
parent | 8144572532b8bfa37af01e1264a334d2a898ea8d (diff) |
runtime: fix asynch messages delivery when handler not defined
Diffstat (limited to 'gnuradio-runtime/lib/tpb_thread_body.cc')
-rw-r--r-- | gnuradio-runtime/lib/tpb_thread_body.cc | 46 |
1 files changed, 30 insertions, 16 deletions
diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc index f9e543b0e5..f6f09a9c3e 100644 --- a/gnuradio-runtime/lib/tpb_thread_body.cc +++ b/gnuradio-runtime/lib/tpb_thread_body.cc @@ -61,8 +61,6 @@ namespace gr { boost::this_thread::interruption_point(); // handle any queued up messages - //BOOST_FOREACH( pmt::pmt_t port, block->msg_queue.keys() ) - BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { // Check if we have a message handler attached before getting // any messages. This is mostly a protection for the unknown @@ -108,15 +106,23 @@ namespace gr { while(!d->d_tpb.input_changed) { // wait for input or message - while(!d->d_tpb.input_changed && block->empty_p()) + while(!d->d_tpb.input_changed && block->empty_handled_p()) d->d_tpb.input_cond.wait(guard); // handle all pending messages BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { - while((msg = block->delete_head_nowait(i.first))) { - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first, msg); - guard.lock(); + if(block->has_msg_handler(i.first)) { + while((msg = block->delete_head_nowait(i.first))) { + guard.unlock(); // release lock while processing msg + block->dispatch_msg(i.first, msg); + guard.lock(); + } + } + else { + // leave msg in queue if no handler is defined + // start dropping if we have too many + if(block->nmsgs(i.first) > max_nmsgs) + msg = block->delete_head_nowait(i.first); } } if (d->done()) { @@ -131,23 +137,31 @@ namespace gr { gr::thread::scoped_lock guard(d->d_tpb.mutex); while(!d->d_tpb.output_changed) { // wait for output room or message - while(!d->d_tpb.output_changed && block->empty_p()) + while(!d->d_tpb.output_changed && block->empty_handled_p()) d->d_tpb.output_cond.wait(guard); // handle all pending messages BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { - while((msg = block->delete_head_nowait(i.first))) { - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first,msg); - guard.lock(); - } - } - } + if(block->has_msg_handler(i.first)) { + while((msg = block->delete_head_nowait(i.first))) { + guard.unlock(); // release lock while processing msg + block->dispatch_msg(i.first, msg); + guard.lock(); + } + } + else { + // leave msg in queue if no handler is defined + // start dropping if we have too many + if(block->nmsgs(i.first) > max_nmsgs) + msg = block->delete_head_nowait(i.first); + } + } + } } break; default: - assert(0); + throw std::runtime_error("possible memory corruption in scheduler"); } } } |