diff options
author | Tim O'Shea <tim.oshea753@gmail.com> | 2013-07-22 06:38:23 -0700 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2013-07-22 07:02:00 -0700 |
commit | fd1f0b7d27f0112f0ec72fd5530e9fb3c607fdec (patch) | |
tree | 775c82e007b4e68a96e4f3aafa5267dbe431d5d9 | |
parent | 8144572532b8bfa37af01e1264a334d2a898ea8d (diff) |
runtime: fix asynch messages delivery when handler not defined
-rw-r--r-- | gnuradio-runtime/include/gnuradio/basic_block.h | 15 | ||||
-rw-r--r-- | gnuradio-runtime/include/gnuradio/block_gateway.h | 5 | ||||
-rw-r--r-- | gnuradio-runtime/lib/tpb_thread_body.cc | 46 |
3 files changed, 48 insertions, 18 deletions
diff --git a/gnuradio-runtime/include/gnuradio/basic_block.h b/gnuradio-runtime/include/gnuradio/basic_block.h index be385465d1..30ae2ca0f0 100644 --- a/gnuradio-runtime/include/gnuradio/basic_block.h +++ b/gnuradio-runtime/include/gnuradio/basic_block.h @@ -118,7 +118,7 @@ namespace gr { /*! * \brief Tests if there is a handler attached to port \p which_port */ - bool has_msg_handler(pmt::pmt_t which_port) { + virtual bool has_msg_handler(pmt::pmt_t which_port) { return (d_msg_handlers.find(which_port) != d_msg_handlers.end()); } @@ -186,7 +186,6 @@ namespace gr { void _post(pmt::pmt_t which_port, pmt::pmt_t msg); //! is the queue empty? - //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); } bool empty_p(pmt::pmt_t which_port) { if(msg_queue.find(which_port) == msg_queue.end()) throw std::runtime_error("port does not exist!"); @@ -200,6 +199,18 @@ namespace gr { return rv; } + //! are all msg ports with handlers empty? + bool empty_handled_p(pmt::pmt_t which_port){ + return (empty_p(which_port) || !has_msg_handler(which_port)); + } + bool empty_handled_p() { + bool rv = true; + BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) { + rv &= empty_handled_p(i.first); + } + return rv; + } + //! How many messages in the queue? size_t nmsgs(pmt::pmt_t which_port) { if(msg_queue.find(which_port) == msg_queue.end()) diff --git a/gnuradio-runtime/include/gnuradio/block_gateway.h b/gnuradio-runtime/include/gnuradio/block_gateway.h index 0f328de2e5..c2d09de00d 100644 --- a/gnuradio-runtime/include/gnuradio/block_gateway.h +++ b/gnuradio-runtime/include/gnuradio/block_gateway.h @@ -253,6 +253,11 @@ namespace gr { typedef std::map<pmt::pmt_t, feval_p *, pmt::comperator> msg_handlers_feval_t; msg_handlers_feval_t d_msg_handlers_feval; + bool has_msg_handler(pmt::pmt_t which_port) + { + return (d_msg_handlers_feval.find(which_port) != d_msg_handlers_feval.end()); + } + void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) { // Is there a handler? diff --git a/gnuradio-runtime/lib/tpb_thread_body.cc b/gnuradio-runtime/lib/tpb_thread_body.cc index f9e543b0e5..f6f09a9c3e 100644 --- a/gnuradio-runtime/lib/tpb_thread_body.cc +++ b/gnuradio-runtime/lib/tpb_thread_body.cc @@ -61,8 +61,6 @@ namespace gr { boost::this_thread::interruption_point(); // handle any queued up messages - //BOOST_FOREACH( pmt::pmt_t port, block->msg_queue.keys() ) - BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { // Check if we have a message handler attached before getting // any messages. This is mostly a protection for the unknown @@ -108,15 +106,23 @@ namespace gr { while(!d->d_tpb.input_changed) { // wait for input or message - while(!d->d_tpb.input_changed && block->empty_p()) + while(!d->d_tpb.input_changed && block->empty_handled_p()) d->d_tpb.input_cond.wait(guard); // handle all pending messages BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { - while((msg = block->delete_head_nowait(i.first))) { - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first, msg); - guard.lock(); + if(block->has_msg_handler(i.first)) { + while((msg = block->delete_head_nowait(i.first))) { + guard.unlock(); // release lock while processing msg + block->dispatch_msg(i.first, msg); + guard.lock(); + } + } + else { + // leave msg in queue if no handler is defined + // start dropping if we have too many + if(block->nmsgs(i.first) > max_nmsgs) + msg = block->delete_head_nowait(i.first); } } if (d->done()) { @@ -131,23 +137,31 @@ namespace gr { gr::thread::scoped_lock guard(d->d_tpb.mutex); while(!d->d_tpb.output_changed) { // wait for output room or message - while(!d->d_tpb.output_changed && block->empty_p()) + while(!d->d_tpb.output_changed && block->empty_handled_p()) d->d_tpb.output_cond.wait(guard); // handle all pending messages BOOST_FOREACH(basic_block::msg_queue_map_t::value_type &i, block->msg_queue) { - while((msg = block->delete_head_nowait(i.first))) { - guard.unlock(); // release lock while processing msg - block->dispatch_msg(i.first,msg); - guard.lock(); - } - } - } + if(block->has_msg_handler(i.first)) { + while((msg = block->delete_head_nowait(i.first))) { + guard.unlock(); // release lock while processing msg + block->dispatch_msg(i.first, msg); + guard.lock(); + } + } + else { + // leave msg in queue if no handler is defined + // start dropping if we have too many + if(block->nmsgs(i.first) > max_nmsgs) + msg = block->delete_head_nowait(i.first); + } + } + } } break; default: - assert(0); + throw std::runtime_error("possible memory corruption in scheduler"); } } } |