From 7ebec8cc4b5e3a9eb480e3f954f9425d92e73a18 Mon Sep 17 00:00:00 2001
From: Tom Rondeau <trondeau@vt.edu>
Date: Thu, 7 Mar 2013 14:24:52 -0500
Subject: core: protect against popping a message off the queue if no handler
 is attached.

This mostly solves a problem with threads being launched in random
order, so a handler might not be established yet, even if there is a
message waiting. Fixes Issue #514.
---
 gnuradio-core/src/lib/runtime/gr_basic_block.h     | 23 +++++++++++++++++++---
 .../src/lib/runtime/gr_tpb_thread_body.cc          | 17 +++++++++++++++-
 2 files changed, 36 insertions(+), 4 deletions(-)

(limited to 'gnuradio-core/src/lib/runtime')

diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index b4935d8ac8..024159c4cc 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -106,6 +106,13 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
   void set_color(vcolor color) { d_color = color; }
   vcolor color() const { return d_color; }
 
+  /*!
+   * \brief Tests if there is a handler attached to port \p which_port
+   */
+   bool has_msg_handler(pmt::pmt_t which_port) {
+     return (d_msg_handlers.find(which_port) != d_msg_handlers.end());
+   }
+
   /*
    * This function is called by the runtime system to dispatch messages.
    *
@@ -115,9 +122,10 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
   virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
   {
     // AA Update this
-    if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+    if(has_msg_handler(which_port)) {  // Is there a handler?
       d_msg_handlers[which_port](msg); // Yes, invoke it.
-  };
+    }
+  }
   
   // Message passing interface
   pmt::pmt_t message_subscribers;
@@ -177,9 +185,18 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
   }
   bool empty_p() { 
     bool rv = true;
-    BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
+    BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue) {
+      rv &= msg_queue[i.first].empty();
+    }
     return rv;
   }
+
+  //! How many messages in the queue?
+  size_t nmsgs(pmt::pmt_t which_port) { 
+    if(msg_queue.find(which_port) == msg_queue.end())
+      throw std::runtime_error("port does not exist!");
+    return msg_queue[which_port].size(); 
+  }
   
   //| Acquires and release the mutex
   void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index cea374fac7..679fd15124 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -22,6 +22,7 @@
 #include <config.h>
 #endif
 #include <gr_tpb_thread_body.h>
+#include <gr_prefs.h>
 #include <iostream>
 #include <boost/thread.hpp>
 #include <gruel/pmt.h>
@@ -41,6 +42,9 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
   d->threaded = true;
   d->thread = gruel::get_current_thread_id();
 
+  gr_prefs *p = gr_prefs::singleton();
+  size_t max_nmsgs = static_cast<size_t>(p->get_long("DEFAULT", "max_messages", 100));
+
   // Set thread affinity if it was set before fg was started.
   if(block->processor_affinity().size() > 0) {
     gruel::thread_bind_to_processor(d->thread, block->processor_affinity());
@@ -54,9 +58,20 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
     
     BOOST_FOREACH( gr_basic_block::msg_queue_map_t::value_type &i, block->msg_queue )
     {
+      // Check if we have a message handler attached before getting
+      // any messages. This is mostly a protection for the unknown
+      // startup sequence of the threads.
+      if(block->has_msg_handler(i.first)) {
         while ((msg = block->delete_head_nowait(i.first))){
-            block->dispatch_msg(i.first,msg);
+          block->dispatch_msg(i.first,msg);
         }
+      }
+      else {
+        // If we don't have a handler but are building up messages,
+        // prune the queue from the front to keep memory in check.
+        if(block->nmsgs(i.first) > max_nmsgs)
+          msg = block->delete_head_nowait(i.first);
+      }
     }
 
     d->d_tpb.clear_changed();
-- 
cgit v1.2.3