From 1181c2fe069075f9ceb3b66ed937115ff39aafa9 Mon Sep 17 00:00:00 2001
From: Eric Blossom <eb@comsec.com>
Date: Thu, 13 Aug 2009 21:50:06 -0700
Subject: Refactored gr_msg_accepter and gr_tpd_thread_body.

Redirected gr_msg_accepter::post into gr_block::_post based on dynamic cast.
---
 gnuradio-core/src/lib/runtime/gr_basic_block.cc    |  3 +-
 gnuradio-core/src/lib/runtime/gr_block_detail.cc   |  7 +++
 gnuradio-core/src/lib/runtime/gr_block_detail.h    |  6 +++
 gnuradio-core/src/lib/runtime/gr_msg_accepter.cc   | 17 +++++---
 gnuradio-core/src/lib/runtime/gr_msg_accepter.h    | 10 +++--
 gnuradio-core/src/lib/runtime/gr_tpb_detail.cc     | 45 ++++++++++++++++++-
 gnuradio-core/src/lib/runtime/gr_tpb_detail.h      | 30 +++++++++----
 .../src/lib/runtime/gr_tpb_thread_body.cc          | 51 ++++++++++++----------
 8 files changed, 125 insertions(+), 44 deletions(-)

(limited to 'gnuradio-core/src')

diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 8efa8267a4..2fa1066cb9 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -41,8 +41,7 @@ gr_basic_block_ncurrently_allocated()
 gr_basic_block::gr_basic_block(const std::string &name,
                                gr_io_signature_sptr input_signature,
                                gr_io_signature_sptr output_signature) 
-  : gr_msg_accepter(gruel::make_msg_queue(0)), // Non-blocking insert
-    d_name(name),
+  : d_name(name),
     d_input_signature(input_signature),
     d_output_signature(output_signature),
     d_unique_id(s_next_id++),
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.cc b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
index ae1ea25628..d33dfed846 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.cc
@@ -106,3 +106,10 @@ gr_block_detail::produce_each (int how_many_items)
     for (int i = 0; i < noutputs (); i++)
       d_output[i]->update_write_pointer (how_many_items);
 }
+
+
+void
+gr_block_detail::_post(pmt::pmt_t msg)
+{
+  d_tpb.insert_tail(msg);
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index 2856c402c7..9d63586024 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -79,6 +79,12 @@ class gr_block_detail {
   void produce_each (int how_many_items);
 
 
+  /*!
+   * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+   */
+  void _post(pmt::pmt_t msg);
+
+
   gr_tpb_detail			     d_tpb;	// used by thread-per-block scheduler
 
   // ----------------------------------------------------------------------------
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
index 50b41df88b..89876ae297 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.cc
@@ -26,11 +26,12 @@
 #include <gr_msg_accepter.h>
 #include <gr_block.h>
 #include <gr_block_detail.h>
+#include <gr_hier_block2.h>
+#include <stdexcept>
 
 using namespace pmt;
 
-gr_msg_accepter::gr_msg_accepter(gruel::msg_queue_sptr msgq)
-  : gruel::msg_accepter_msgq(msgq)
+gr_msg_accepter::gr_msg_accepter()
 {
 }
 
@@ -42,15 +43,17 @@ gr_msg_accepter::~gr_msg_accepter()
 void
 gr_msg_accepter::post(pmt_t msg)
 {
-  // Let parent class do whatever it would have
-  gruel::msg_accepter_msgq::post(msg);
-
   // Notify derived class, handled case by case
   gr_block *p = dynamic_cast<gr_block *>(this);
   if (p) { 
-    p->detail()->d_tpb.notify_msg();
+    p->detail()->_post(msg);
+    return;
+  }
+  gr_hier_block2 *p2 = dynamic_cast<gr_hier_block2 *>(this);
+  if (p2){
+    // FIXME do the right thing
     return;
   }
 
-  // Test for other derived classes and handle
+  throw std::runtime_error("unknown derived class");
 }
diff --git a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
index 2073e7ff16..79a631f3a6 100644
--- a/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
+++ b/gnuradio-core/src/lib/runtime/gr_msg_accepter.h
@@ -22,19 +22,21 @@
 #ifndef INCLUDED_GR_MSG_ACCEPTER_H
 #define INCLUDED_GR_MSG_ACCEPTER_H
 
-#include <gruel/msg_accepter_msgq.h>
+#include <gruel/msg_accepter.h>
+#include <gruel/pmt.h>
 
 /*!
  * \brief Accepts messages and inserts them into a message queue, then notifies
  * subclass gr_basic_block there is a message pending.
  */
-class gr_msg_accepter : public gruel::msg_accepter_msgq
+class gr_msg_accepter : public gruel::msg_accepter
 {
 public:
-  gr_msg_accepter(gruel::msg_queue_sptr msgq);
+  gr_msg_accepter();
   ~gr_msg_accepter();
-  
+
   void post(pmt::pmt_t msg);
+
 };
 
 #endif /* INCLUDED_GR_MSG_ACCEPTER_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
index 02e8deed88..c6311ccaa3 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2008 Free Software Foundation, Inc.
+ * Copyright 2008,2009 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
@@ -27,6 +27,8 @@
 #include <gr_block_detail.h>
 #include <gr_buffer.h>
 
+using namespace pmt;
+
 /*
  * We assume that no worker threads are ever running when the
  * graph structure is being manipulated, thus it's safe for us to poke
@@ -65,3 +67,44 @@ gr_tpb_detail::notify_neighbors(gr_block_detail *d)
   notify_downstream(d);
   notify_upstream(d);
 }
+
+void
+gr_tpb_detail::insert_tail(pmt::pmt_t msg)
+{
+  gruel::scoped_lock guard(mutex);
+
+  msg_queue.push_back(msg);
+
+  // wake up thread if BLKD_IN or BLKD_OUT
+  input_cond.notify_one();
+  output_cond.notify_one();
+}
+
+pmt_t 
+gr_tpb_detail::delete_head_nowait()
+{
+  gruel::scoped_lock guard(mutex);
+
+  if (empty_p())
+    return pmt_t();
+
+  pmt_t m(msg_queue.front());
+  msg_queue.pop_front();
+
+  return m;
+}
+
+/*
+ * Caller must already be holding the mutex
+ */
+pmt_t 
+gr_tpb_detail::delete_head_nowait_already_holding_mutex()
+{
+  if (empty_p())
+    return pmt_t();
+
+  pmt_t m(msg_queue.front());
+  msg_queue.pop_front();
+
+  return m;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
index a1df55806e..acfa264c7c 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -22,6 +22,8 @@
 #define INCLUDED_GR_TPB_DETAIL_H
 
 #include <gruel/thread.h>
+#include <deque>
+#include <gruel/pmt.h>
 
 class gr_block_detail;
 
@@ -36,6 +38,10 @@ struct gr_tpb_detail {
   bool				output_changed;
   gruel::condition_variable	output_cond;
 
+private:
+  std::deque<pmt::pmt_t>	msg_queue;
+
+public:
   gr_tpb_detail()
     : input_changed(false), output_changed(false) { }
 
@@ -55,16 +61,23 @@ struct gr_tpb_detail {
     input_changed = false;
     output_changed = false;
   }
+  
+  //! is the queue empty?
+  bool empty_p() const { return msg_queue.empty(); }
 
-  //! Called to notify us that a message is pending in the queue
-  void notify_msg()
-  {
-    gruel::scoped_lock guard(mutex);
+  //| Acquires and release the mutex	
+  void insert_tail(pmt::pmt_t msg);
 
-    // Just wake up thread if BLKD_IN or BLKD_OUT
-    input_cond.notify_one();
-    output_cond.notify_one();
-  }
+  /*!
+   * \returns returns pmt at head of queue or pmt_t() if empty.
+   */
+  pmt::pmt_t delete_head_nowait();
+
+  /*!
+   * \returns returns pmt at head of queue or pmt_t() if empty.
+   * Caller must already be holding the mutex
+   */
+  pmt::pmt_t delete_head_nowait_already_holding_mutex();
 
 private:
 
@@ -83,6 +96,7 @@ private:
     output_changed = true;
     output_cond.notify_one();
   }
+
 };
 
 #endif /* INCLUDED_GR_TPB_DETAIL_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index c601b588c2..03eef17d93 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -35,12 +35,15 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
 
   gr_block_detail *d = block->detail().get();
   gr_block_executor::state s;
+  pmt_t msg;
+
 
   while (1){
     boost::this_thread::interruption_point();
  
-    while (!block->msg_queue()->empty_p())
-      block->handle_msg(block->msg_queue()->delete_head_nowait());
+    // handle any queued up messages
+    while ((msg = d->d_tpb.delete_head_nowait()))
+      block->handle_msg(msg);
 
     d->d_tpb.clear_changed();
     s = d_exec.run_one_iteration();
@@ -59,37 +62,41 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
       return;
 
     case gr_block_executor::BLKD_IN:		// Wait for input.
-      while (!d->d_tpb.input_changed) 
       {
-	boost::this_thread::interruption_point();
 	gruel::scoped_lock guard(d->d_tpb.mutex);
-	
-	// Block then wake on input_changed or msg arrived
-	while(!d->d_tpb.input_changed && !block->msg_queue()->empty_p())
-	  d->d_tpb.input_cond.wait(guard); 
+	while (!d->d_tpb.input_changed){
+	  
+	  // wait for input or message
+	  while(!d->d_tpb.input_changed && d->d_tpb.empty_p())
+	    d->d_tpb.input_cond.wait(guard);
 
-	// Run msgq while unlocked
-	guard.unlock();
-	while (!block->msg_queue()->empty_p())
-	  block->handle_msg(block->msg_queue()->delete_head_nowait());
+	  // handle all pending messages
+	  while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+	    guard.unlock();			// release lock while processing msg
+	    block->handle_msg(msg);
+	    guard.lock();
+	  }
+	}
       }
       break;
 
       
     case gr_block_executor::BLKD_OUT:		// Wait for output buffer space.
-      while (!d->d_tpb.output_changed) 
       {
-	boost::this_thread::interruption_point();
 	gruel::scoped_lock guard(d->d_tpb.mutex);
+	while (!d->d_tpb.output_changed){
+	  
+	  // wait for output room or message
+	  while(!d->d_tpb.output_changed && d->d_tpb.empty_p())
+	    d->d_tpb.output_cond.wait(guard);
 
-	// Block then wake on output_changed or msg arrived
-	while(!d->d_tpb.output_changed && !block->msg_queue()->empty_p())
-	  d->d_tpb.output_cond.wait(guard); 
-
-	// Run msgq while unlocked
-	guard.unlock();
-	while (!block->msg_queue()->empty_p())
-	  block->handle_msg(block->msg_queue()->delete_head_nowait());
+	  // handle all pending messages
+	  while ((msg = d->d_tpb.delete_head_nowait_already_holding_mutex())){
+	    guard.unlock();			// release lock while processing msg
+	    block->handle_msg(msg);
+	    guard.lock();
+	  }
+	}
       }
       break;
 
-- 
cgit v1.2.3