From 2c8ea58e4d76f54c98d71d3fcc64bc29da490908 Mon Sep 17 00:00:00 2001
From: eb <eb@221aa14e-8319-0410-a670-987f0aec2ac5>
Date: Tue, 19 Aug 2008 23:09:56 +0000
Subject: Merged features/mp-sched -r8915:9335 into the trunk.  The trunk now
 contains the SMP aware scheduler.  This changeset introduces a dependency on
 boost 1.35 or later. See source:gnuradio/trunk/README.building-boost for
 additional info.

git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@9336 221aa14e-8319-0410-a670-987f0aec2ac5
---
 gnuradio-core/src/lib/runtime/Makefile.am          |  20 +-
 gnuradio-core/src/lib/runtime/gr_block.cc          |   8 +
 gnuradio-core/src/lib/runtime/gr_block.h           |   6 +-
 gnuradio-core/src/lib/runtime/gr_block_detail.h    |  11 +-
 gnuradio-core/src/lib/runtime/gr_block_executor.cc | 329 +++++++++++++++++++++
 gnuradio-core/src/lib/runtime/gr_block_executor.h  |  69 +++++
 gnuradio-core/src/lib/runtime/gr_buffer.cc         |  31 +-
 gnuradio-core/src/lib/runtime/gr_buffer.h          |  71 ++++-
 gnuradio-core/src/lib/runtime/gr_buffer.i          |   8 +-
 gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc |  48 ++-
 gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h  |   9 +-
 gnuradio-core/src/lib/runtime/gr_flowgraph.h       |   4 +-
 .../src/lib/runtime/gr_hier_block2_detail.cc       |   2 +-
 gnuradio-core/src/lib/runtime/gr_scheduler.cc      |  33 +++
 gnuradio-core/src/lib/runtime/gr_scheduler.h       |  64 ++++
 gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc  |  87 ++++++
 gnuradio-core/src/lib/runtime/gr_scheduler_sts.h   |  62 ++++
 .../src/lib/runtime/gr_scheduler_thread.cc         | 110 -------
 .../src/lib/runtime/gr_scheduler_thread.h          |  59 ----
 gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc  |  95 ++++++
 gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h   |  60 ++++
 .../lib/runtime/gr_single_threaded_scheduler.cc    |  12 +-
 gnuradio-core/src/lib/runtime/gr_top_block.cc      |   3 +-
 gnuradio-core/src/lib/runtime/gr_top_block_impl.cc | 135 +++++----
 gnuradio-core/src/lib/runtime/gr_top_block_impl.h  |  32 +-
 .../src/lib/runtime/gr_top_block_impl_sts.cc       | 128 --------
 .../src/lib/runtime/gr_top_block_impl_sts.h        |  55 ----
 gnuradio-core/src/lib/runtime/gr_tpb_detail.cc     |  67 +++++
 gnuradio-core/src/lib/runtime/gr_tpb_detail.h      |  81 +++++
 .../src/lib/runtime/gr_tpb_thread_body.cc          |  76 +++++
 gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h |  45 +++
 gnuradio-core/src/lib/runtime/qa_gr_buffer.cc      |  14 +-
 32 files changed, 1336 insertions(+), 498 deletions(-)
 create mode 100644 gnuradio-core/src/lib/runtime/gr_block_executor.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_block_executor.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
 delete mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc
 delete mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_thread.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
 delete mode 100644 gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc
 delete mode 100644 gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_tpb_detail.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h

(limited to 'gnuradio-core/src/lib/runtime')

diff --git a/gnuradio-core/src/lib/runtime/Makefile.am b/gnuradio-core/src/lib/runtime/Makefile.am
index 550031b944..b21b324128 100644
--- a/gnuradio-core/src/lib/runtime/Makefile.am
+++ b/gnuradio-core/src/lib/runtime/Makefile.am
@@ -21,7 +21,7 @@
 
 include $(top_srcdir)/Makefile.common
 
-AM_CPPFLAGS = $(STD_DEFINES_AND_INCLUDES) $(CPPUNIT_INCLUDES) $(WITH_INCLUDES)
+AM_CPPFLAGS = $(STD_DEFINES_AND_INCLUDES) $(CPPUNIT_INCLUDES) $(GRUEL_INCLUDES) $(WITH_INCLUDES)
 
 noinst_LTLIBRARIES = libruntime.la libruntime-qa.la
 
@@ -35,6 +35,7 @@ libruntime_la_SOURCES = 			\
 	gr_flat_flowgraph.cc			\
 	gr_block.cc				\
 	gr_block_detail.cc			\
+	gr_block_executor.cc			\
 	gr_hier_block2.cc			\
 	gr_hier_block2_detail.cc		\
 	gr_buffer.cc				\
@@ -48,16 +49,19 @@ libruntime_la_SOURCES = 			\
 	gr_pagesize.cc				\
 	gr_preferences.cc			\
 	gr_realtime.cc				\
-	gr_scheduler_thread.cc			\
+	gr_scheduler.cc				\
+	gr_scheduler_sts.cc			\
+	gr_scheduler_tpb.cc			\
 	gr_single_threaded_scheduler.cc		\
 	gr_sptr_magic.cc			\
 	gr_sync_block.cc			\
 	gr_sync_decimator.cc			\
 	gr_sync_interpolator.cc			\
+	gr_tmp_path.cc				\
 	gr_top_block.cc				\
 	gr_top_block_impl.cc			\
-	gr_top_block_impl_sts.cc		\
-	gr_tmp_path.cc				\
+	gr_tpb_detail.cc			\
+	gr_tpb_thread_body.cc			\
 	gr_vmcircbuf.cc				\
 	gr_vmcircbuf_mmap_shm_open.cc		\
 	gr_vmcircbuf_mmap_tmpfile.cc		\
@@ -82,6 +86,7 @@ grinclude_HEADERS = 				\
 	gr_flat_flowgraph.h			\
 	gr_block.h				\
 	gr_block_detail.h			\
+	gr_block_executor.h			\
 	gr_hier_block2.h			\
 	gr_hier_block2_detail.h			\
 	gr_buffer.h				\
@@ -97,7 +102,9 @@ grinclude_HEADERS = 				\
 	gr_preferences.h			\
 	gr_realtime.h				\
 	gr_runtime_types.h			\
-	gr_scheduler_thread.h			\
+	gr_scheduler.h				\
+	gr_scheduler_sts.h			\
+	gr_scheduler_tpb.h			\
 	gr_select_handler.h			\
 	gr_single_threaded_scheduler.h		\
 	gr_sptr_magic.h				\
@@ -106,7 +113,8 @@ grinclude_HEADERS = 				\
 	gr_sync_interpolator.h			\
 	gr_top_block.h				\
 	gr_top_block_impl.h			\
-	gr_top_block_impl_sts.h			\
+	gr_tpb_detail.h				\
+	gr_tpb_thread_body.h			\
 	gr_timer.h				\
 	gr_tmp_path.h				\
 	gr_types.h				\
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 0a8fb92c2d..7c2e9901b0 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -110,3 +110,11 @@ gr_block::fixed_rate_noutput_to_ninput(int noutput)
 {
   throw std::runtime_error("Unimplemented");
 }
+
+std::ostream&
+operator << (std::ostream& os, const gr_block *m)
+{
+  os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
+  return os;
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 79237ee83b..437b610b45 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -214,9 +214,13 @@ class gr_block : public gr_basic_block {
 typedef std::vector<gr_block_sptr> gr_block_vector_t;
 typedef std::vector<gr_block_sptr>::iterator gr_block_viter_t;
 
-inline gr_block_sptr make_gr_block_sptr(gr_basic_block_sptr p)
+inline gr_block_sptr cast_to_block_sptr(gr_basic_block_sptr p)
 {
   return boost::dynamic_pointer_cast<gr_block, gr_basic_block>(p);
 }
 
+
+std::ostream&
+operator << (std::ostream& os, const gr_block *m);
+
 #endif /* INCLUDED_GR_BLOCK_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index a3b7731c01..2856c402c7 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -24,6 +24,7 @@
 #define INCLUDED_GR_BLOCK_DETAIL_H
 
 #include <gr_runtime_types.h>
+#include <gr_tpb_detail.h>
 #include <stdexcept>
 
 /*!
@@ -34,7 +35,6 @@
  * of almost all users of GNU Radio.  This decoupling also means that
  * we can make changes to the guts without having to recompile everything.
  */
-
 class gr_block_detail {
  public:
   ~gr_block_detail ();
@@ -73,8 +73,14 @@ class gr_block_detail {
    */
   void consume_each (int how_many_items);
 
+  /*!
+   * \brief Tell the scheduler \p how_many_items were produced on each output stream.
+   */
   void produce_each (int how_many_items);
 
+
+  gr_tpb_detail			     d_tpb;	// used by thread-per-block scheduler
+
   // ----------------------------------------------------------------------------
 
  private:
@@ -84,8 +90,11 @@ class gr_block_detail {
   std::vector<gr_buffer_sptr>	     d_output;
   bool                               d_done;
 
+
   gr_block_detail (unsigned int ninputs, unsigned int noutputs);
 
+  friend class gr_tpb_detail;
+
   friend gr_block_detail_sptr
   gr_make_block_detail (unsigned int ninputs, unsigned int noutputs);
 };
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
new file mode 100644
index 0000000000..fd3a916d4a
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -0,0 +1,329 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+#include <boost/thread.hpp>
+#include <iostream>
+#include <limits>
+#include <assert.h>
+#include <stdio.h>
+
+// must be defined to either 0 or 1
+#define ENABLE_LOGGING 0
+
+#if (ENABLE_LOGGING)
+#define LOG(x) do { x; } while(0)
+#else
+#define LOG(x) do {;} while(0)
+#endif
+
+static int which_scheduler  = 0;
+
+inline static unsigned int
+round_up (unsigned int n, unsigned int multiple)
+{
+  return ((n + multiple - 1) / multiple) * multiple;
+}
+
+inline static unsigned int
+round_down (unsigned int n, unsigned int multiple)
+{
+  return (n / multiple) * multiple;
+}
+
+//
+// Return minimum available write space in all our downstream buffers
+// or -1 if we're output blocked and the output we're blocked
+// on is done.
+//
+static int
+min_available_space (gr_block_detail *d, int output_multiple)
+{
+  int	min_space = std::numeric_limits<int>::max();
+
+  for (int i = 0; i < d->noutputs (); i++){
+    gr_buffer::scoped_lock guard(*d->output(i)->mutex());
+#if 0
+    int n = round_down(d->output(i)->space_available(), output_multiple);
+#else
+    int n = round_down(std::min(d->output(i)->space_available(),
+				d->output(i)->bufsize()/2),
+		       output_multiple);
+#endif
+    if (n == 0){			// We're blocked on output.
+      if (d->output(i)->done()){	// Downstream is done, therefore we're done.
+	return -1;
+      }
+      return 0;
+    }
+    min_space = std::min (min_space, n);
+  }
+  return min_space;
+}
+
+
+
+gr_block_executor::gr_block_executor (gr_block_sptr block)
+  : d_block(block), d_log(0)
+{
+  if (ENABLE_LOGGING){
+    char name[100];
+    snprintf(name, sizeof(name), "sst-%03d.log", which_scheduler++);
+    d_log = new std::ofstream(name);
+    std::unitbuf(*d_log);		// make it unbuffered...
+    *d_log << "gr_block_executor: "
+	   << d_block << std::endl;
+  }
+
+  d_block->start();			// enable any drivers, etc.
+}
+
+gr_block_executor::~gr_block_executor ()
+{
+  if (ENABLE_LOGGING)
+    delete d_log;
+
+  d_block->stop();			// stop any drivers, etc.
+}
+
+gr_block_executor::state
+gr_block_executor::run_one_iteration()
+{
+  int			noutput_items;
+  int			max_items_avail;
+
+  gr_block		*m = d_block.get();
+  gr_block_detail	*d = m->detail().get();
+
+  LOG(*d_log << std::endl << m);
+
+  if (d->done()){
+    assert(0);
+    return DONE;
+  }
+
+  if (d->source_p ()){
+    d_ninput_items_required.resize (0);
+    d_ninput_items.resize (0);
+    d_input_items.resize (0);
+    d_input_done.resize(0);
+    d_output_items.resize (d->noutputs ());
+
+    // determine the minimum available output space
+    noutput_items = min_available_space (d, m->output_multiple ());
+    LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
+    if (noutput_items == -1)		// we're done
+      goto were_done;
+
+    if (noutput_items == 0){		// we're output blocked
+      LOG(*d_log << "  BLKD_OUT\n");
+      return BLKD_OUT;
+    }
+
+    goto setup_call_to_work;		// jump to common code
+  }
+
+  else if (d->sink_p ()){
+    d_ninput_items_required.resize (d->ninputs ());
+    d_ninput_items.resize (d->ninputs ());
+    d_input_items.resize (d->ninputs ());
+    d_input_done.resize(d->ninputs());
+    d_output_items.resize (0);
+    LOG(*d_log << " sink\n");
+
+    max_items_avail = 0;
+    for (int i = 0; i < d->ninputs (); i++){
+      {
+	/*
+	 * Acquire the mutex and grab local copies of items_available and done.
+	 */
+	gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+	d_ninput_items[i] = d->input(i)->items_available();
+	d_input_done[i] = d->input(i)->done();
+      }
+
+      LOG(*d_log << "  d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
+      LOG(*d_log << "  d_input_done[" << i << "] = " << d_input_done[i] << std::endl);
+      
+      if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
+	goto were_done;
+	
+      max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+    }
+
+    // take a swag at how much output we can sink
+    noutput_items = (int) (max_items_avail * m->relative_rate ());
+    noutput_items = round_down (noutput_items, m->output_multiple ());
+    LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
+    LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);
+
+    if (noutput_items == 0){	// we're blocked on input
+      LOG(*d_log << "  BLKD_IN\n");
+      return BLKD_IN;
+    }
+
+    goto try_again;		// Jump to code shared with regular case.
+  }
+
+  else {
+    // do the regular thing
+    d_ninput_items_required.resize (d->ninputs ());
+    d_ninput_items.resize (d->ninputs ());
+    d_input_items.resize (d->ninputs ());
+    d_input_done.resize(d->ninputs());
+    d_output_items.resize (d->noutputs ());
+
+    max_items_avail = 0;
+    for (int i = 0; i < d->ninputs (); i++){
+      {
+	/*
+	 * Acquire the mutex and grab local copies of items_available and done.
+	 */
+	gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+	d_ninput_items[i] = d->input(i)->items_available ();
+	d_input_done[i] = d->input(i)->done();
+      }
+      max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+    }
+
+    // determine the minimum available output space
+    noutput_items = min_available_space (d, m->output_multiple ());
+    if (ENABLE_LOGGING){
+      *d_log << " regular ";
+      if (m->relative_rate() >= 1.0)
+	*d_log << "1:" << m->relative_rate() << std::endl;
+      else
+	*d_log << 1.0/m->relative_rate() << ":1\n";
+      *d_log << "  max_items_avail = " << max_items_avail << std::endl;
+      *d_log << "  noutput_items = " << noutput_items << std::endl;
+    }
+    if (noutput_items == -1)		// we're done
+      goto were_done;
+
+    if (noutput_items == 0){		// we're output blocked
+      LOG(*d_log << "  BLKD_OUT\n");
+      return BLKD_OUT;
+    }
+
+  try_again:
+    if (m->fixed_rate()){
+      // try to work it forward starting with max_items_avail.
+      // We want to try to consume all the input we've got.
+      int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
+      reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
+      if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+	noutput_items = reqd_noutput_items;
+    }
+
+    // ask the block how much input they need to produce noutput_items
+    m->forecast (noutput_items, d_ninput_items_required);
+
+    // See if we've got sufficient input available
+
+    int i;
+    for (i = 0; i < d->ninputs (); i++)
+      if (d_ninput_items_required[i] > d_ninput_items[i])	// not enough
+	break;
+
+    if (i < d->ninputs ()){			// not enough input on input[i]
+      // if we can, try reducing the size of our output request
+      if (noutput_items > m->output_multiple ()){
+	noutput_items /= 2;
+	noutput_items = round_up (noutput_items, m->output_multiple ());
+	goto try_again;
+      }
+
+      // We're blocked on input
+      LOG(*d_log << "  BLKD_IN\n");
+      if (d_input_done[i]) 	// If the upstream block is done, we're done
+	goto were_done;
+
+      // Is it possible to ever fulfill this request?
+      if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
+	// Nope, never going to happen...
+	std::cerr << "\nsched: <gr_block " << m->name()
+		  << " (" << m->unique_id() << ")>"
+		  << " is requesting more input data\n"
+		  << "  than we can provide.\n"
+		  << "  ninput_items_required = "
+		  << d_ninput_items_required[i] << "\n"
+		  << "  max_possible_items_available = "
+		  << d->input(i)->max_possible_items_available() << "\n"
+		  << "  If this is a filter, consider reducing the number of taps.\n";
+	goto were_done;
+      }
+
+      return BLKD_IN;
+    }
+
+    // We've got enough data on each input to produce noutput_items.
+    // Finish setting up the call to work.
+
+    for (int i = 0; i < d->ninputs (); i++)
+      d_input_items[i] = d->input(i)->read_pointer();
+
+  setup_call_to_work:
+
+    for (int i = 0; i < d->noutputs (); i++)
+      d_output_items[i] = d->output(i)->write_pointer();
+
+    // Do the actual work of the block
+    int n = m->general_work (noutput_items, d_ninput_items,
+			     d_input_items, d_output_items);
+    LOG(*d_log << "  general_work: noutput_items = " << noutput_items
+	<< " result = " << n << std::endl);
+
+    if (n == -1)		// block is done
+      goto were_done;
+
+    d->produce_each (n);	// advance write pointers
+    if (n > 0)
+      return READY;
+
+    // We didn't produce any output even though we called general_work.
+    // We have (most likely) consumed some input.
+
+    // If this is a source, it's broken.
+    if (d->source_p()){
+      std::cerr << "gr_block_executor: source " << m
+		<< " returned 0 from work.  We're marking it DONE.\n";
+      // FIXME maybe we ought to raise an exception...
+      goto were_done;
+    }
+
+    // Have the caller try again...
+    return READY_NO_OUTPUT;
+  }
+  assert (0);
+    
+ were_done:
+  LOG(*d_log << "  were_done\n");
+  d->set_done (true);
+  return DONE;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.h b/gnuradio-core/src/lib/runtime/gr_block_executor.h
new file mode 100644
index 0000000000..41b5ede7c8
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.h
@@ -0,0 +1,69 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_BLOCK_EXECUTOR_H
+#define INCLUDED_GR_BLOCK_EXECUTOR_H
+
+#include <gr_runtime_types.h>
+#include <fstream>
+
+//class gr_block_executor;
+//typedef boost::shared_ptr<gr_block_executor>	gr_block_executor_sptr;
+
+
+/*!
+ * \brief Manage the execution of a single block.
+ * \ingroup internal
+ */
+
+class gr_block_executor {
+protected:
+  gr_block_sptr			d_block;	// The block we're trying to run
+  std::ofstream	       	       *d_log;
+
+  // These are allocated here so we don't have to on each iteration
+
+  gr_vector_int			d_ninput_items_required;
+  gr_vector_int			d_ninput_items;
+  gr_vector_const_void_star	d_input_items;
+  std::vector<bool>		d_input_done;
+  gr_vector_void_star		d_output_items;
+
+ public:
+  gr_block_executor(gr_block_sptr block);
+  ~gr_block_executor ();
+
+  enum state {
+    READY,	      // We made progress; everything's cool.
+    READY_NO_OUTPUT,  // We consumed some input, but produced no output.
+    BLKD_IN,	      // no progress; we're blocked waiting for input data.
+    BLKD_OUT,	      // no progress; we're blocked waiting for output buffer space.
+    DONE,	      // we're done; don't call me again.
+  };
+
+  /*
+   * \brief Run one iteration.
+   */
+  state run_one_iteration();
+};
+
+#endif /* INCLUDED_GR_BLOCK_EXECUTOR_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.cc b/gnuradio-core/src/lib/runtime/gr_buffer.cc
index 77f0c7c43d..31a471ea75 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.cc
@@ -77,10 +77,10 @@ minimum_buffer_items (long type_size, long page_size)
 }
 
 
-gr_buffer::gr_buffer (int nitems, size_t sizeof_item)
+gr_buffer::gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link)
   : d_base (0), d_bufsize (0), d_vmcircbuf (0),
-    d_sizeof_item (sizeof_item), d_write_index (0),
-    d_done (false)
+    d_sizeof_item (sizeof_item), d_link(link),
+    d_write_index (0), d_done (false)
 {
   if (!allocate_buffer (nitems, sizeof_item))
     throw std::bad_alloc ();
@@ -89,9 +89,9 @@ gr_buffer::gr_buffer (int nitems, size_t sizeof_item)
 }
 
 gr_buffer_sptr 
-gr_make_buffer (int nitems, size_t sizeof_item)
+gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link)
 {
-  return gr_buffer_sptr (new gr_buffer (nitems, sizeof_item));
+  return gr_buffer_sptr (new gr_buffer (nitems, sizeof_item, link));
 }
 
 gr_buffer::~gr_buffer ()
@@ -146,7 +146,7 @@ gr_buffer::allocate_buffer (int nitems, size_t sizeof_item)
 
 
 int
-gr_buffer::space_available () const
+gr_buffer::space_available ()
 {
   if (d_readers.empty ())
     return d_bufsize - 1;	// See comment below
@@ -175,18 +175,27 @@ gr_buffer::write_pointer ()
 void
 gr_buffer::update_write_pointer (int nitems)
 {
+  scoped_lock	guard(*mutex());
   d_write_index = index_add (d_write_index, nitems);
 }
 
+void
+gr_buffer::set_done (bool done)
+{
+  scoped_lock	guard(*mutex());
+  d_done = done;
+}
+
 gr_buffer_reader_sptr
-gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload)
+gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link)
 {
   if (nzero_preload < 0)
     throw std::invalid_argument("gr_buffer_add_reader: nzero_preload must be >= 0");
 
   gr_buffer_reader_sptr r (new gr_buffer_reader (buf,
 						 buf->index_sub(buf->d_write_index,
-								nzero_preload)));
+								nzero_preload),
+						 link));
   buf->d_readers.push_back (r.get ());
 
   return r;
@@ -214,8 +223,9 @@ gr_buffer_ncurrently_allocated ()
 
 // ----------------------------------------------------------------------------
 
-gr_buffer_reader::gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index)
-  : d_buffer (buffer), d_read_index (read_index)
+gr_buffer_reader::gr_buffer_reader(gr_buffer_sptr buffer, unsigned int read_index,
+				   gr_block_sptr link)
+  : d_buffer(buffer), d_read_index(read_index), d_link(link)
 {
   s_buffer_reader_count++;
 }
@@ -241,6 +251,7 @@ gr_buffer_reader::read_pointer ()
 void
 gr_buffer_reader::update_read_pointer (int nitems)
 {
+  scoped_lock	guard(*mutex());
   d_read_index = d_buffer->index_add (d_read_index, nitems);
 }
 
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.h b/gnuradio-core/src/lib/runtime/gr_buffer.h
index cf578c89dd..75063cc6a1 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.h
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.h
@@ -24,6 +24,8 @@
 #define INCLUDED_GR_BUFFER_H
 
 #include <gr_runtime_types.h>
+#include <boost/weak_ptr.hpp>
+#include <boost/thread.hpp>
 
 class gr_vmcircbuf;
 
@@ -33,8 +35,12 @@ class gr_vmcircbuf;
  * The total size of the buffer will be rounded up to a system
  * dependent boundary.  This is typically the system page size, but
  * under MS windows is 64KB.
+ *
+ * \param nitems is the minimum number of items the buffer will hold.
+ * \param sizeof_item is the size of an item in bytes.
+ * \param link is the block that writes to this buffer.
  */
-gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
+gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link=gr_block_sptr());
 
 
 /*!
@@ -43,12 +49,20 @@ gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
  */
 class gr_buffer {
  public:
+
+  typedef boost::unique_lock<boost::mutex>  scoped_lock;
+
   virtual ~gr_buffer ();
 
   /*!
    * \brief return number of items worth of space available for writing
    */
-  int space_available () const;
+  int space_available ();
+
+  /*!
+   * \brief return size of this buffer in items
+   */
+  int bufsize() const { return d_bufsize; }
 
   /*!
    * \brief return pointer to write buffer.
@@ -63,17 +77,26 @@ class gr_buffer {
    */
   void update_write_pointer (int nitems);
 
-
-  void set_done (bool done)   { d_done = done; }
+  void set_done (bool done);
   bool done () const { return d_done; }
 
+  /*!
+   * \brief Return the block that writes to this buffer.
+   */
+  gr_block_sptr link() { return gr_block_sptr(d_link); }
+
+  size_t nreaders() const { return d_readers.size(); }
+  gr_buffer_reader* reader(size_t index) { return d_readers[index]; }
+
+  boost::mutex *mutex() { return &d_mutex; }
+
   // -------------------------------------------------------------------------
 
  private:
 
   friend class gr_buffer_reader;
-  friend gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
-  friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+  friend gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
+  friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
 
  protected:
   char				       *d_base;		// base address of buffer
@@ -81,8 +104,14 @@ class gr_buffer {
  private:
   gr_vmcircbuf			       *d_vmcircbuf;
   size_t	 			d_sizeof_item;	// in bytes
-  unsigned int				d_write_index;	// in items [0,d_bufsize)
   std::vector<gr_buffer_reader *>	d_readers;
+  boost::weak_ptr<gr_block>		d_link;		// block that writes to this buffer
+
+  //
+  // The mutex protects d_write_index, d_done and the d_read_index's in the buffer readers.
+  //
+  boost::mutex				d_mutex;
+  unsigned int				d_write_index;	// in items [0,d_bufsize)
   bool					d_done;
   
   unsigned
@@ -116,11 +145,15 @@ class gr_buffer {
    *
    * Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
    *
+   * \param nitems is the minimum number of items the buffer will hold.
+   * \param sizeof_item is the size of an item in bytes.
+   * \param link is the block that writes to this buffer.
+   *
    * The total size of the buffer will be rounded up to a system
    * dependent boundary.  This is typically the system page size, but
    * under MS windows is 64KB.
    */
-  gr_buffer (int nitems, size_t sizeof_item);
+  gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
 
   /*!
    * \brief disassociate \p reader from this buffer
@@ -132,8 +165,10 @@ class gr_buffer {
 /*!
  * \brief create a new gr_buffer_reader and attach it to buffer \p buf
  * \param nzero_preload -- number of zero items to "preload" into buffer.
+ * \param link is the block that reads from the buffer using this gr_buffer_reader.
  */
-gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+gr_buffer_reader_sptr 
+gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link=gr_block_sptr());
 
 //! returns # of gr_buffers currently allocated
 long gr_buffer_ncurrently_allocated ();
@@ -147,8 +182,10 @@ long gr_buffer_ncurrently_allocated ();
  */
 
 class gr_buffer_reader {
-
  public:
+
+  typedef gr_buffer::scoped_lock scoped_lock;
+
   ~gr_buffer_reader ();
 
   /*!
@@ -183,19 +220,29 @@ class gr_buffer_reader {
   void set_done (bool done)   { d_buffer->set_done (done); }
   bool done () const { return d_buffer->done (); }
 
+  boost::mutex *mutex() { return d_buffer->mutex(); }
+
+
+  /*!
+   * \brief Return the block that reads via this reader.
+   */
+  gr_block_sptr link() { return gr_block_sptr(d_link); }
+
   // -------------------------------------------------------------------------
 
  private:
 
   friend class gr_buffer;
-  friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+  friend gr_buffer_reader_sptr 
+  gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
 
 
   gr_buffer_sptr		d_buffer;
   unsigned int			d_read_index;	// in items [0,d->buffer.d_bufsize)
+  boost::weak_ptr<gr_block>	d_link;		// block that reads via this buffer reader
 
   //! constructor is private.  Use gr_buffer::add_reader to create instances
-  gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index);
+  gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link);
 };
 
 //! returns # of gr_buffer_readers currently allocated
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.i b/gnuradio-core/src/lib/runtime/gr_buffer.i
index 38e1d945da..4c1c5afae5 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.i
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.i
@@ -26,14 +26,14 @@ typedef boost::shared_ptr<gr_buffer> gr_buffer_sptr;
 %rename(buffer) gr_make_buffer;
 %ignore gr_buffer;
 
-gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
+gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
 
 class gr_buffer {
  public:
   ~gr_buffer ();
 
  private:
-  gr_buffer (int nitems, size_t sizeof_item);
+  gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
 };
   
 
@@ -43,7 +43,7 @@ typedef boost::shared_ptr<gr_buffer_reader> gr_buffer_reader_sptr;
 %ignore gr_buffer_reader;
 
 %rename(buffer_add_reader) gr_buffer_add_reader;
-gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
 
 class gr_buffer_reader {
  public:
@@ -51,7 +51,7 @@ class gr_buffer_reader {
 
  private:
   friend class gr_buffer;
-  gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index);
+  gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link);
 };
 
 
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index aa1aa83532..031eb6dfd5 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -33,6 +33,11 @@
 
 #define GR_FLAT_FLOWGRAPH_DEBUG 0
 
+// 32Kbyte buffer size between blocks
+#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
+
+static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE;
+
 gr_flat_flowgraph_sptr
 gr_make_flat_flowgraph()
 {
@@ -54,7 +59,7 @@ gr_flat_flowgraph::setup_connections()
 
   // Assign block details to blocks
   for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
-    make_gr_block_sptr(*p)->set_detail(allocate_block_detail(*p));
+    cast_to_block_sptr(*p)->set_detail(allocate_block_detail(*p));
 
   // Connect inputs to outputs for each block
   for(gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
@@ -84,11 +89,15 @@ gr_flat_flowgraph::allocate_block_detail(gr_basic_block_sptr block)
 gr_buffer_sptr
 gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
 {
-  gr_block_sptr grblock = make_gr_block_sptr(block);
+  gr_block_sptr grblock = cast_to_block_sptr(block);
   if (!grblock)
     throw std::runtime_error("allocate_buffer found non-gr_block");
   int item_size = block->output_signature()->sizeof_stream_item(port);
-  int nitems = s_fixed_buffer_size/item_size;
+
+  // *2 because we're now only filling them 1/2 way in order to
+  // increase the available parallelism when using the TPB scheduler.
+  // (We're double buffering, where we used to single buffer)
+  int nitems = s_fixed_buffer_size * 2 / item_size;
 
   // Make sure there are at least twice the output_multiple no. of items
   if (nitems < 2*grblock->output_multiple())	// Note: this means output_multiple()
@@ -99,7 +108,7 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
   gr_basic_block_vector_t blocks = calc_downstream_blocks(block, port);
 
   for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
-    gr_block_sptr dgrblock = make_gr_block_sptr(*p);
+    gr_block_sptr dgrblock = cast_to_block_sptr(*p);
     if (!dgrblock)
       throw std::runtime_error("allocate_buffer found non-gr_block");
 
@@ -109,13 +118,13 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
     nitems = std::max(nitems, static_cast<int>(2*(decimation*multiple+history)));
   }
 
-  return gr_make_buffer(nitems, item_size);
+  return gr_make_buffer(nitems, item_size, grblock);
 }
 
 void
 gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
 {
-  gr_block_sptr grblock = make_gr_block_sptr(block);
+  gr_block_sptr grblock = cast_to_block_sptr(block);
   if (!grblock)
     throw std::runtime_error("connect_block_inputs found non-gr_block");
   
@@ -130,7 +139,7 @@ gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
     int dst_port = e->dst().port();
     int src_port = e->src().port();
     gr_basic_block_sptr src_block = e->src().block();
-    gr_block_sptr src_grblock = make_gr_block_sptr(src_block);
+    gr_block_sptr src_grblock = cast_to_block_sptr(src_block);
     if (!src_grblock)
       throw std::runtime_error("connect_block_inputs found non-gr_block");
     gr_buffer_sptr src_buffer = src_grblock->detail()->output(src_port);
@@ -138,7 +147,7 @@ gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
     if (GR_FLAT_FLOWGRAPH_DEBUG)
       std::cout << "Setting input " << dst_port << " from edge " << (*e) << std::endl;
 
-    detail->set_input(dst_port, gr_buffer_add_reader(src_buffer, grblock->history()-1));
+    detail->set_input(dst_port, gr_buffer_add_reader(src_buffer, grblock->history()-1, grblock));
   }
 }
 
@@ -149,7 +158,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
   // by flattening will need one; existing blocks still in the new flowgraph will
   // already have one.
   for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
-    gr_block_sptr block = make_gr_block_sptr(*p);
+    gr_block_sptr block = cast_to_block_sptr(*p);
     
     if (!block->detail()) {
       if (GR_FLAT_FLOWGRAPH_DEBUG)
@@ -177,7 +186,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
       if (GR_FLAT_FLOWGRAPH_DEBUG)
 	std::cout << "not in new edge list" << std::endl;
       // zero the buffer reader on RHS of old edge
-      gr_block_sptr block(make_gr_block_sptr(old_edge->dst().block()));
+      gr_block_sptr block(cast_to_block_sptr(old_edge->dst().block()));
       int port = old_edge->dst().port();
       block->detail()->set_input(port, gr_buffer_reader_sptr());
     }
@@ -189,7 +198,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
 
   // Now connect inputs to outputs, reusing old buffer readers if they exist
   for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
-    gr_block_sptr block = make_gr_block_sptr(*p);
+    gr_block_sptr block = cast_to_block_sptr(*p);
 
     if (GR_FLAT_FLOWGRAPH_DEBUG)
       std::cout << "merge: merging " << (*p) << "...";
@@ -208,7 +217,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
 	gr_edge edge = calc_upstream_edge(*p, i);
 
 	// Fish out old buffer reader and see if it matches correct buffer from edge list
-	gr_block_sptr src_block = make_gr_block_sptr(edge.src().block());
+	gr_block_sptr src_block = cast_to_block_sptr(edge.src().block());
 	gr_block_detail_sptr src_detail = src_block->detail();
 	gr_buffer_sptr src_buffer = src_detail->output(edge.src().port());
 	gr_buffer_reader_sptr old_reader;
@@ -225,7 +234,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
 	    std::cout << "needs a new reader" << std::endl;
 
 	  // Create new buffer reader and assign
-	  detail->set_input(i, gr_buffer_add_reader(src_buffer, block->history()-1));
+	  detail->set_input(i, gr_buffer_add_reader(src_buffer, block->history()-1, block));
 	}
       }
     }
@@ -248,7 +257,7 @@ void gr_flat_flowgraph::dump()
 
   for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
     std::cout << " block: " << (*p) << std::endl;
-    gr_block_detail_sptr detail = make_gr_block_sptr(*p)->detail();
+    gr_block_detail_sptr detail = cast_to_block_sptr(*p)->detail();
     std::cout << "  detail @" << detail << ":" << std::endl;
      
     int ni = detail->ninputs();
@@ -269,3 +278,14 @@ void gr_flat_flowgraph::dump()
   }
 
 }
+
+gr_block_vector_t
+gr_flat_flowgraph::make_block_vector(gr_basic_block_vector_t &blocks)
+{
+  gr_block_vector_t result;
+  for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
+    result.push_back(cast_to_block_sptr(*p));
+  }
+
+  return result;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
index 184ee45144..673c4df16f 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
@@ -26,9 +26,6 @@
 #include <gr_flowgraph.h>
 #include <gr_block.h>
 
-// 32Kbyte buffer size between blocks
-#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
-
 // Create a shared pointer to a heap allocated gr_flat_flowgraph
 // (types defined in gr_runtime_types.h)
 gr_flat_flowgraph_sptr gr_make_flat_flowgraph();
@@ -55,10 +52,14 @@ public:
 
   void dump();
 
+  /*!
+   * Make a vector of gr_block from a vector of gr_basic_block
+   */
+  static gr_block_vector_t make_block_vector(gr_basic_block_vector_t &blocks);
+
 private:
   gr_flat_flowgraph();
 
-  static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE;
   gr_block_detail_sptr allocate_block_detail(gr_basic_block_sptr block);
   gr_buffer_sptr allocate_buffer(gr_basic_block_sptr block, int port);
   void connect_block_inputs(gr_basic_block_sptr block);
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index c97a50782c..fc407e72be 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -122,6 +122,9 @@ public:
   // Return vector of connected blocks
   gr_basic_block_vector_t calc_used_blocks();
 
+  // Return toplogically sorted vector of blocks.  All the sources come first.
+  gr_basic_block_vector_t topological_sort(gr_basic_block_vector_t &blocks);
+
   // Return vector of vectors of disjointly connected blocks, topologically
   // sorted.
   std::vector<gr_basic_block_vector_t> partition();
@@ -149,7 +152,6 @@ private:
   gr_basic_block_vector_t calc_reachable_blocks(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
   void reachable_dfs_visit(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
   gr_basic_block_vector_t calc_adjacent_blocks(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
-  gr_basic_block_vector_t topological_sort(gr_basic_block_vector_t &blocks);
   gr_basic_block_vector_t sort_sources_first(gr_basic_block_vector_t &blocks);
   bool source_p(gr_basic_block_sptr block);
   void topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_vector_t &output);
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index 32cac2ea8c..a026851d20 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -303,7 +303,7 @@ gr_hier_block2_detail::resolve_endpoint(const gr_endpoint &endp, bool is_input)
   std::stringstream msg;
 
   // Check if endpoint is a leaf node
-  if (make_gr_block_sptr(endp.block()))
+  if (cast_to_block_sptr(endp.block()))
     return endp;
   
   // Check if endpoint is a hierarchical block
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_scheduler.cc
new file mode 100644
index 0000000000..e4d8b3dd9a
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler.cc
@@ -0,0 +1,33 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler.h>
+
+gr_scheduler::gr_scheduler(gr_flat_flowgraph_sptr ffg)
+{
+}
+
+gr_scheduler::~gr_scheduler()
+{
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.h b/gnuradio-core/src/lib/runtime/gr_scheduler.h
new file mode 100644
index 0000000000..13bc1ff145
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler.h
@@ -0,0 +1,64 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_GR_SCHEDULER_H
+#define INCLUDED_GR_SCHEDULER_H
+
+#include <boost/utility.hpp>
+#include <gr_block.h>
+#include <gr_flat_flowgraph.h>
+
+
+class gr_scheduler;
+typedef boost::shared_ptr<gr_scheduler> gr_scheduler_sptr;
+
+
+/*!
+ * \brief Abstract scheduler that takes a flattened flow graph and runs it.
+ *
+ * Preconditions: details, buffers and buffer readers have been assigned.
+ */
+class gr_scheduler : boost::noncopyable
+{
+
+public:
+  /*!
+   * \brief Construct a scheduler and begin evaluating the graph.
+   *
+   * The scheduler will continue running until all blocks until they
+   * report that they are done or the stop method is called.
+   */
+  gr_scheduler(gr_flat_flowgraph_sptr ffg);
+
+  virtual ~gr_scheduler();
+
+  /*!
+   * \brief Tell the scheduler to stop executing.
+   */
+  virtual void stop() = 0;
+
+  /*!
+   * \brief Block until the graph is done.
+   */
+  virtual void wait() = 0;
+};
+
+#endif /* INCLUDED_GR_SCHEDULER_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
new file mode 100644
index 0000000000..fefc0dc703
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
@@ -0,0 +1,87 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler_sts.h>
+#include <gr_single_threaded_scheduler.h>
+#include <gruel/thread_body_wrapper.h>
+
+class sts_container
+{
+  gr_block_vector_t	d_blocks;
+  
+public:
+
+  sts_container(gr_block_vector_t blocks)
+    : d_blocks(blocks) {}
+
+  void operator()()
+  {
+    gr_make_single_threaded_scheduler(d_blocks)->run();
+  }
+};
+
+
+gr_scheduler_sptr
+gr_scheduler_sts::make(gr_flat_flowgraph_sptr ffg)
+{
+  return gr_scheduler_sptr(new gr_scheduler_sts(ffg));
+}
+
+gr_scheduler_sts::gr_scheduler_sts(gr_flat_flowgraph_sptr ffg)
+  : gr_scheduler(ffg)
+{
+  // Split the flattened flow graph into discrete partitions, each
+  // of which is topologically sorted.
+
+  std::vector<gr_basic_block_vector_t> graphs = ffg->partition();
+
+  // For each partition, create a thread to evaluate it using
+  // an instance of the gr_single_threaded_scheduler
+
+  for (std::vector<gr_basic_block_vector_t>::iterator p = graphs.begin();
+       p != graphs.end(); p++) {
+
+    gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(*p);
+    d_threads.create_thread(
+        gruel::thread_body_wrapper<sts_container>(sts_container(blocks),
+						  "single-threaded-scheduler"));
+  }
+}
+
+gr_scheduler_sts::~gr_scheduler_sts()
+{
+  stop();
+}
+
+void
+gr_scheduler_sts::stop()
+{
+  d_threads.interrupt_all();
+}
+
+void
+gr_scheduler_sts::wait()
+{
+  d_threads.join_all();
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
new file mode 100644
index 0000000000..4cf8351561
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
@@ -0,0 +1,62 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_SCHEDULER_STS_H
+#define INCLUDED_GR_SCHEDULER_STS_H
+
+#include <gr_scheduler.h>
+#include <gruel/thread_group.h>
+
+/*!
+ * \brief Concrete scheduler that uses the single_threaded_scheduler
+ */
+class gr_scheduler_sts : public gr_scheduler
+{
+  gruel::thread_group		       d_threads;
+
+protected:
+  /*!
+   * \brief Construct a scheduler and begin evaluating the graph.
+   *
+   * The scheduler will continue running until all blocks until they
+   * report that they are done or the stop method is called.
+   */
+  gr_scheduler_sts(gr_flat_flowgraph_sptr ffg);
+
+public:
+  static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg);
+
+  ~gr_scheduler_sts();
+
+  /*!
+   * \brief Tell the scheduler to stop executing.
+   */
+  void stop();
+
+  /*!
+   * \brief Block until the graph is done.
+   */
+  void wait();
+};
+
+
+
+
+#endif /* INCLUDED_GR_SCHEDULER_STS_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc
deleted file mode 100644
index 07bd60500d..0000000000
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc
+++ /dev/null
@@ -1,110 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <gr_scheduler_thread.h>
-#include <iostream>
-#include <stdio.h>
-
-#ifdef HAVE_SIGNAL_H
-#include <signal.h>
-#endif
-
-#define GR_SCHEDULER_THREAD_DEBUG 0
-
-gr_scheduler_thread::gr_scheduler_thread(gr_block_vector_t graph) :
-  omni_thread(NULL, PRIORITY_NORMAL),
-  d_sts(gr_make_single_threaded_scheduler(graph))
-{
-}
-
-gr_scheduler_thread::~gr_scheduler_thread()
-{
-}
-
-void gr_scheduler_thread::start()
-{
-  if (GR_SCHEDULER_THREAD_DEBUG)
-    std::cout << "gr_scheduler_thread::start() "
-	      << this << std::endl;
-  start_undetached();
-}
-
-void *
-gr_scheduler_thread::run_undetached(void *arg)
-{
-  // This is the first code to run in the new thread context.
-
-  /*
-   * In general, on a *nix system, any thread of a process can receive
-   * any asynchronous signal.
-   *
-   * http://www.serpentine.com/blog/threads-faq/mixing-threads-and-signals-unix/
-   * http://www.linuxjournal.com/article/2121
-   * 
-   * We really don't want to be handling asynchronous signals such
-   * as SIGINT and SIGHUP here.  We mask them off in the signal
-   * processing threads so that they'll get handled by the mainline
-   * thread.  We leave the synchronous signals SIGQUIT, SIGBUS,
-   * SIGILL, SIGSEGV etc alone
-   *
-   * FIXME? It might be better to mask them all off in the parent
-   * thread then dedicate a single thread to handling all signals
-   * using sigwait.
-   */
-#if defined(HAVE_PTHREAD_SIGMASK) || defined(HAVE_SIGPROCMASK)
-  sigset_t old_set;
-  sigset_t new_set;
-  int r;
-  sigemptyset(&new_set);
-  sigaddset(&new_set, SIGINT);
-  sigaddset(&new_set, SIGHUP);
-  sigaddset(&new_set, SIGPIPE);
-  sigaddset(&new_set, SIGALRM);
-  sigaddset(&new_set, SIGCHLD);
-
-#ifdef HAVE_PTHREAD_SIGMASK
-  r = pthread_sigmask(SIG_BLOCK, &new_set, &old_set);
-  if (r != 0)
-    perror("pthread_sigmask");
-#else
-  r = sigprocmask(SIG_BLOCK, &new_set, &old_set);
-  if (r != 0)
-    perror("sigprocmask");
-#endif
-#endif
-  // Run the single-threaded scheduler
-  d_sts->run();
-  return 0;
-}
-
-void
-gr_scheduler_thread::stop()
-{
-  if (0 && GR_SCHEDULER_THREAD_DEBUG)		// FIXME not safe to call from signal handler
-    std::cout << "gr_scheduler_thread::stop() "
-	      << this << std::endl;
-  d_sts->stop();
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h b/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h
deleted file mode 100644
index 89daba4031..0000000000
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- * 
- * This file is part of GNU Radio
- * 
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- * 
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef INCLUDED_GR_SCHEDULER_THREAD_H
-#define INCLUDED_GR_SCHEDULER_THREAD_H
-
-#include <omnithread.h>
-#include <gr_single_threaded_scheduler.h>
-#include <gr_block.h>
-
-// omnithread calls delete on itself after thread exits, so can't use shared ptr
-class gr_scheduler_thread;
-typedef std::vector<gr_scheduler_thread *> gr_scheduler_thread_vector_t;
-typedef gr_scheduler_thread_vector_t::iterator gr_scheduler_thread_viter_t;
-
-/*!
- *\brief A single thread of execution for the scheduler
- *
- * \ingroup internal
- * This class implements a single thread that runs undetached, and
- * invokes the single-threaded block scheduler.  The runtime makes
- * one of these for each distinct partition of a flowgraph and runs
- * them in parallel.
- *
- */
-class gr_scheduler_thread : public omni_thread
-{
-private:
-  gr_single_threaded_scheduler_sptr d_sts;    
-
-public:
-  gr_scheduler_thread(gr_block_vector_t graph);
-  ~gr_scheduler_thread();
-
-  virtual void *run_undetached(void *arg);
-  void start();
-  void stop();
-};
-
-#endif /* INCLUDED_GR_SCHEDULER_THREAD_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
new file mode 100644
index 0000000000..af03385705
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
@@ -0,0 +1,95 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler_tpb.h>
+#include <gr_tpb_thread_body.h>
+#include <gruel/thread_body_wrapper.h>
+#include <sstream>
+
+/*
+ * You know, a lambda expression would be sooo much easier...
+ */
+class tpb_container
+{
+  gr_block_sptr	d_block;
+  
+public:
+  tpb_container(gr_block_sptr block) : d_block(block) {}
+
+  void operator()()
+  {
+    gr_tpb_thread_body	body(d_block);
+  }
+};
+
+
+gr_scheduler_sptr
+gr_scheduler_tpb::make(gr_flat_flowgraph_sptr ffg)
+{
+  return gr_scheduler_sptr(new gr_scheduler_tpb(ffg));
+}
+
+gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg)
+  : gr_scheduler(ffg)
+{
+  // Get a topologically sorted vector of all the blocks in use.
+  // Being topologically sorted probably isn't going to matter, but
+  // there's a non-zero chance it might help...
+
+  gr_basic_block_vector_t used_blocks = ffg->calc_used_blocks();
+  used_blocks = ffg->topological_sort(used_blocks);
+  gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(used_blocks);
+
+  // Ensure that the done flag is clear on all blocks
+
+  for (size_t i = 0; i < blocks.size(); i++){
+    blocks[i]->detail()->set_done(false);
+  }
+
+  // Fire off a thead for each block
+
+  for (size_t i = 0; i < blocks.size(); i++){
+    std::stringstream name;
+    name << "thread-per-block[" << i << "]: " << blocks[i];
+    d_threads.create_thread(
+      gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i]), name.str()));
+  }
+}
+
+gr_scheduler_tpb::~gr_scheduler_tpb()
+{
+  stop();
+}
+
+void
+gr_scheduler_tpb::stop()
+{
+  d_threads.interrupt_all();
+}
+
+void
+gr_scheduler_tpb::wait()
+{
+  d_threads.join_all();
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
new file mode 100644
index 0000000000..16a0c0204e
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
@@ -0,0 +1,60 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_SCHEDULER_TPB_H
+#define INCLUDED_GR_SCHEDULER_TPB_H
+
+#include <gr_scheduler.h>
+#include <gruel/thread_group.h>
+
+/*!
+ * \brief Concrete scheduler that uses a kernel thread-per-block
+ */
+class gr_scheduler_tpb : public gr_scheduler
+{
+  gruel::thread_group		       d_threads;
+
+protected:
+  /*!
+   * \brief Construct a scheduler and begin evaluating the graph.
+   *
+   * The scheduler will continue running until all blocks until they
+   * report that they are done or the stop method is called.
+   */
+  gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg);
+
+public:
+  static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg);
+
+  ~gr_scheduler_tpb();
+
+  /*!
+   * \brief Tell the scheduler to stop executing.
+   */
+  void stop();
+
+  /*!
+   * \brief Block until the graph is done.
+   */
+  void wait();
+};
+
+
+#endif /* INCLUDED_GR_SCHEDULER_TPB_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
index b2fbdb73be..7f1b40641e 100644
--- a/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
+++ b/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
@@ -28,6 +28,7 @@
 #include <gr_block.h>
 #include <gr_block_detail.h>
 #include <gr_buffer.h>
+#include <boost/thread.hpp>
 #include <iostream>
 #include <limits>
 #include <assert.h>
@@ -44,14 +45,6 @@
 
 static int which_scheduler  = 0;
 
-
-std::ostream&
-operator << (std::ostream& os, const gr_block *m)
-{
-  os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
-  return os;
-}
-
 gr_single_threaded_scheduler_sptr
 gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks)
 {
@@ -162,6 +155,9 @@ gr_single_threaded_scheduler::main_loop ()
   nalive = d_blocks.size ();
   while (d_enabled && nalive > 0){
 
+    if (boost::this_thread::interruption_requested())
+      break;
+
     gr_block		*m = d_blocks[bi].get ();
     gr_block_detail	*d = m->detail().get ();
 
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block.cc b/gnuradio-core/src/lib/runtime/gr_top_block.cc
index 3c8e28f701..09e46dfbb4 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_top_block.cc
@@ -27,7 +27,6 @@
 #include <unistd.h>
 #include <gr_top_block.h>
 #include <gr_top_block_impl.h>
-#include <gr_top_block_impl_sts.h>
 #include <gr_io_signature.h>
 #include <iostream>
 
@@ -43,7 +42,7 @@ gr_top_block::gr_top_block(const std::string &name)
 		   gr_make_io_signature(0,0,0))
   
 {
-  d_impl = new gr_top_block_impl_sts(this);
+  d_impl = new gr_top_block_impl(this);
 }
   
 gr_top_block::~gr_top_block()
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
index 5914379384..50d480d009 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
+++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
@@ -27,21 +27,58 @@
 #include <gr_top_block.h>
 #include <gr_top_block_impl.h>
 #include <gr_flat_flowgraph.h>
-#include <gr_scheduler_thread.h>
-#include <gr_local_sighandler.h>
+#include <gr_scheduler_sts.h>
+#include <gr_scheduler_tpb.h>
 
 #include <stdexcept>
 #include <iostream>
 #include <string.h>
 #include <unistd.h>
+#include <stdlib.h>
 
 #define GR_TOP_BLOCK_IMPL_DEBUG 0
 
+
+typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg);
+
+static struct scheduler_table {
+  const char 	       *name;
+  scheduler_maker	f;
+} scheduler_table[] = {
+  { "TPB",	gr_scheduler_tpb::make },	// first entry is default
+  { "STS",	gr_scheduler_sts::make }
+};
+
+static gr_scheduler_sptr
+make_scheduler(gr_flat_flowgraph_sptr ffg)
+{
+  static scheduler_maker  factory = 0;
+
+  if (factory == 0){
+    char *v = getenv("GR_SCHEDULER");
+    if (!v)
+      factory = scheduler_table[0].f;	// use default
+    else {
+      for (size_t i = 0; i < sizeof(scheduler_table)/sizeof(scheduler_table[0]); i++){
+	if (strcmp(v, scheduler_table[i].name) == 0){
+	  factory = scheduler_table[i].f;
+	  break;
+	}
+      }
+      if (factory == 0){
+	std::cerr << "warning: Invalid GR_SCHEDULER environment variable value \""
+		  << v << "\".  Using \"" << scheduler_table[0].name << "\"\n";
+	factory = scheduler_table[0].f;
+      }
+    }
+  }
+  return factory(ffg);
+}
+
+
 gr_top_block_impl::gr_top_block_impl(gr_top_block *owner) 
-  : d_owner(owner),
-    d_running(false),
-    d_ffg(),
-    d_lock_count(0)
+  : d_owner(owner), d_ffg(),
+    d_state(IDLE), d_lock_count(0)
 {
 }
 
@@ -53,14 +90,13 @@ gr_top_block_impl::~gr_top_block_impl()
 void
 gr_top_block_impl::start()
 {
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "start: entered " << this << std::endl;
+  gr_lock_guard	l(d_mutex);
 
-  if (d_running)
+  if (d_state != IDLE)
     throw std::runtime_error("top_block::start: top block already running or wait() not called after previous stop()");
 
   if (d_lock_count > 0)
-    throw std::runtime_error("top_block::start: can't call start with flow graph locked");
+    throw std::runtime_error("top_block::start: can't start with flow graph locked");
 
   // Create new flat flow graph by flattening hierarchy
   d_ffg = d_owner->flatten();
@@ -69,77 +105,71 @@ gr_top_block_impl::start()
   d_ffg->validate();
   d_ffg->setup_connections();
 
-  // Execute scheduler threads
-  start_threads();
-  d_running = true;
+  d_scheduler = make_scheduler(d_ffg);
+  d_state = RUNNING;
 }
 
+void 
+gr_top_block_impl::stop()
+{
+  if (d_scheduler)
+    d_scheduler->stop();
+}
+
+
+void
+gr_top_block_impl::wait()
+{
+  if (d_scheduler)
+    d_scheduler->wait();
+
+  d_state = IDLE;
+}
 
 // N.B. lock() and unlock() cannot be called from a flow graph thread or
 // deadlock will occur when reconfiguration happens
 void
 gr_top_block_impl::lock()
 {
-  omni_mutex_lock lock(d_reconf);
+  gr_lock_guard lock(d_mutex);
   d_lock_count++;
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "runtime: locked, count = " << d_lock_count <<  std::endl;
 }
 
 void
 gr_top_block_impl::unlock()
 {
-  omni_mutex_lock lock(d_reconf);
+  gr_lock_guard lock(d_mutex);
+
   if (d_lock_count <= 0){
     d_lock_count = 0;		// fix it, then complain
     throw std::runtime_error("unpaired unlock() call");
   }
 
   d_lock_count--;
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "unlock: unlocked, count = " << d_lock_count << std::endl;
+  if (d_lock_count > 0 || d_state == IDLE) // nothing to do
+    return;
 
-  if (d_lock_count == 0) {
-    if (GR_TOP_BLOCK_IMPL_DEBUG)
-      std::cout << "unlock: restarting flowgraph" << std::endl;
-    restart();
-  }
+  restart();
 }
 
+/*
+ * restart is called with d_mutex held
+ */
 void
 gr_top_block_impl::restart()
 {
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "restart: entered" << std::endl;
-
-  if (!d_running)
-    return;		// nothing to do
-
-  // Stop scheduler threads and wait for completion
-  stop();
+  stop();		     // Stop scheduler and wait for completion
   wait();
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "restart: threads stopped" << std::endl;
 
   // Create new simple flow graph
   gr_flat_flowgraph_sptr new_ffg = d_owner->flatten();        
   new_ffg->validate();		       // check consistency, sanity, etc
-
-  if (GR_TOP_BLOCK_IMPL_DEBUG) {
-      std::cout << std::endl << "*** Existing flat flowgraph @" << d_ffg << ":" << std::endl;
-      d_ffg->dump();
-  }
   new_ffg->merge_connections(d_ffg);   // reuse buffers, etc
-
-  if (GR_TOP_BLOCK_IMPL_DEBUG) {
-    std::cout << std::endl << "*** New flat flowgraph after merge @" << new_ffg << ":" << std::endl;
-    new_ffg->dump();
-  }
-  
   d_ffg = new_ffg;
 
-  start_threads();
-  d_running = true;
+  // Create a new scheduler to execute it
+  d_scheduler = make_scheduler(d_ffg);
+  d_state = RUNNING;
 }
 
 void
@@ -148,14 +178,3 @@ gr_top_block_impl::dump()
   if (d_ffg)
     d_ffg->dump();
 }
-
-gr_block_vector_t
-gr_top_block_impl::make_gr_block_vector(gr_basic_block_vector_t blocks)
-{
-  gr_block_vector_t result;
-  for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
-    result.push_back(make_gr_block_sptr(*p));
-  }
-
-  return result;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
index 869f788ef4..35fb44ef92 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
+++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
@@ -23,7 +23,11 @@
 #ifndef INCLUDED_GR_TOP_BLOCK_IMPL_H
 #define INCLUDED_GR_TOP_BLOCK_IMPL_H
 
-#include <gr_scheduler_thread.h>
+#include <gr_scheduler.h>
+#include <boost/thread.hpp>
+
+typedef boost::mutex			gr_mutex; 	// FIXME move these elsewhere
+typedef boost::lock_guard<boost::mutex>	gr_lock_guard;
 
 /*!
  *\brief Abstract implementation details of gr_top_block
@@ -37,16 +41,16 @@ class gr_top_block_impl
 {
 public:
   gr_top_block_impl(gr_top_block *owner);
-  virtual ~gr_top_block_impl();
+  ~gr_top_block_impl();
 
   // Create and start scheduler threads
-  virtual void start();
+  void start();
 
   // Signal scheduler threads to stop
-  virtual void stop() = 0;
+  void stop();
 
   // Wait for scheduler threads to exit
-  virtual void wait() = 0;
+  void wait();
 
   // Lock the top block to allow reconfiguration
   void lock();
@@ -59,22 +63,16 @@ public:
   
 protected:
     
+  enum tb_state { IDLE, RUNNING };
+
   gr_top_block                  *d_owner;
-  bool                           d_running;
   gr_flat_flowgraph_sptr         d_ffg;
+  gr_scheduler_sptr		 d_scheduler;
 
-  omni_mutex                     d_reconf;	// protects d_lock_count
+  gr_mutex                       d_mutex;	// protects d_state and d_lock_count
+  tb_state			 d_state;
   int                            d_lock_count;
-
-  virtual void start_threads() = 0;
-
-/*!
- * Make a vector of gr_block from a vector of gr_basic_block
- *
- * Pass-by-value to avoid problem with possible asynchronous modification
- */
-  static gr_block_vector_t make_gr_block_vector(gr_basic_block_vector_t blocks);
-
+  
 private:
   void restart();
 };
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc
deleted file mode 100644
index b3e9da6275..0000000000
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc
+++ /dev/null
@@ -1,128 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007,2008 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <gr_top_block.h>
-#include <gr_top_block_impl_sts.h>
-#include <gr_flat_flowgraph.h>
-#include <gr_scheduler_thread.h>
-#include <gr_local_sighandler.h>
-
-#include <stdexcept>
-#include <iostream>
-#include <string.h>
-#include <unistd.h>
-
-#define GR_TOP_BLOCK_IMPL_STS_DEBUG 0
-
-static gr_top_block_impl *s_impl = 0;
-
-
-// FIXME: This prevents using more than one gr_top_block instance
-
-static void 
-runtime_sigint_handler(int signum)
-{
-  if (GR_TOP_BLOCK_IMPL_STS_DEBUG){
-    char *msg = "SIGINT received, calling stop()\n";
-    ::write(1, msg, strlen(msg));	// write is OK to call from signal handler
-  }
-
-  if (s_impl)
-    s_impl->stop();
-}
-
-// ----------------------------------------------------------------
-
-gr_top_block_impl_sts::gr_top_block_impl_sts(gr_top_block *owner) 
-  : gr_top_block_impl(owner)
-{
-  if (s_impl)
-    throw std::logic_error("gr_top_block_impl_sts: multiple simultaneous gr_top_blocks not allowed");
-
-  s_impl = this;
-}
-
-gr_top_block_impl_sts::~gr_top_block_impl_sts()
-{
-  s_impl = 0; // don't call delete we don't own these
-}
-
-void
-gr_top_block_impl_sts::start_threads()
-{
-  if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-    std::cout << "start_threads: entered" << std::endl;
-
-  d_graphs = d_ffg->partition();
-  for (std::vector<gr_basic_block_vector_t>::iterator p = d_graphs.begin();
-       p != d_graphs.end(); p++) {
-    gr_scheduler_thread *thread = new gr_scheduler_thread(make_gr_block_vector(*p));
-    d_threads.push_back(thread);
-    if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-      std::cout << "start_threads: starting " << thread << std::endl;
-    thread->start();
-  }
-}
-
-/*
- * N.B. as currently implemented, it is possible that this may be
- * invoked by the SIGINT handler which is fragile as hell...
- */
-void
-gr_top_block_impl_sts::stop()
-{
-  if (GR_TOP_BLOCK_IMPL_STS_DEBUG){
-    char *msg = "stop: entered\n";
-    ::write(1, msg, strlen(msg));
-  }
-
-  for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
-    if (*p)
-      (*p)->stop();
-  }
-}
-
-void
-gr_top_block_impl_sts::wait()
-{
-  if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-    std::cout << "wait: entered" << std::endl;
-
-  void *dummy_status; // don't ever dereference this
-  gr_local_sighandler sigint(SIGINT, runtime_sigint_handler);
-
-  for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
-    if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-      std::cout << "wait: joining thread " << (*p) << std::endl;
-    (*p)->join(&dummy_status); // omnithreads will self-delete, so pointer is now dead
-    (*p) = 0; // FIXME: switch to stl::list and actually remove from container
-    if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-      std::cout << "wait: join returned" << std::endl;
-  }
-
-  d_threads.clear();
-  d_running = false;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h
deleted file mode 100644
index ec2e51cf25..0000000000
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- * 
- * This file is part of GNU Radio
- * 
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- * 
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef INCLUDED_GR_TOP_BLOCK_IMPL_STS_H
-#define INCLUDED_GR_TOP_BLOCK_IMPL_STS_H
-
-#include <gr_top_block_impl.h>
-#include <gr_scheduler_thread.h>
-
-/*!
- *\brief Implementation details of gr_top_block
- * \ingroup internal
- *
- * Concrete implementation of gr_top_block using gr_single_threaded_scheduler.
- */
-class gr_top_block_impl_sts : public gr_top_block_impl
-{
-public:
-  gr_top_block_impl_sts(gr_top_block *owner);
-  ~gr_top_block_impl_sts();
-
-  // Signal scheduler threads to stop
-  void stop();
-
-  // Wait for scheduler threads to exit
-  void wait();
-
-private:
-    
-  gr_scheduler_thread_vector_t   d_threads;
-  std::vector<gr_basic_block_vector_t> d_graphs;
-
-  void start_threads();
-};
-
-#endif /* INCLUDED_GR_TOP_BLOCK_IMPL_STS_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
new file mode 100644
index 0000000000..02e8deed88
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -0,0 +1,67 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_tpb_detail.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+
+/*
+ * We assume that no worker threads are ever running when the
+ * graph structure is being manipulated, thus it's safe for us to poke
+ * around in our neighbors w/o holding any locks.
+ */
+
+void
+gr_tpb_detail::notify_upstream(gr_block_detail *d)
+{
+  // For each of our inputs, tell the guy upstream that we've consumed
+  // some input, and that he most likely has more output buffer space
+  // available.
+
+  for (size_t i = 0; i < d->d_input.size(); i++){
+    // Can you say, "pointer chasing?"
+    d->d_input[i]->buffer()->link()->detail()->d_tpb.set_output_changed();
+  }
+}
+
+void
+gr_tpb_detail::notify_downstream(gr_block_detail *d)
+{
+  // For each of our outputs, tell the guys downstream that they have
+  // new input available.
+
+  for (size_t i = 0; i < d->d_output.size(); i++){
+    gr_buffer_sptr buf = d->d_output[i];
+    for (size_t j = 0, k = buf->nreaders(); j < k; j++)
+      buf->reader(j)->link()->detail()->d_tpb.set_input_changed();
+  }
+}
+
+void
+gr_tpb_detail::notify_neighbors(gr_block_detail *d)
+{
+  notify_downstream(d);
+  notify_upstream(d);
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
new file mode 100644
index 0000000000..9566312dc8
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -0,0 +1,81 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_TPB_DETAIL_H
+#define INCLUDED_GR_TPB_DETAIL_H
+
+#include <boost/thread.hpp>
+
+class gr_block_detail;
+
+/*!
+ * \brief used by thread-per-block scheduler
+ */
+struct gr_tpb_detail {
+  typedef boost::unique_lock<boost::mutex>  scoped_lock;
+
+  boost::mutex			mutex;			//< protects all vars
+  bool				input_changed;
+  boost::condition_variable	input_cond;
+  bool				output_changed;
+  boost::condition_variable	output_cond;
+
+  gr_tpb_detail()
+    : input_changed(false), output_changed(false) {}
+
+
+  //! Called by us to tell all our upstream blocks that their output may have changed.
+  void notify_upstream(gr_block_detail *d);
+
+  //! Called by us to tell all our downstream blocks that their input may have changed.
+  void notify_downstream(gr_block_detail *d);
+
+  //! Called by us to notify both upstream and downstream
+  void notify_neighbors(gr_block_detail *d);
+
+  //! Called by us
+  void clear_changed()
+  {
+    scoped_lock	guard(mutex);
+    input_changed = false;
+    output_changed = false;
+  }
+
+private:
+
+  //! Used by notify_downstream
+  void set_input_changed()
+  {
+    scoped_lock	guard(mutex);
+    input_changed = true;
+    input_cond.notify_one();
+  }
+
+  //! Used by notify_upstream
+  void set_output_changed()
+  {
+    scoped_lock	guard(mutex);
+    output_changed = true;
+    output_cond.notify_one();
+  }
+
+};
+
+#endif /* INCLUDED_GR_TPB_DETAIL_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
new file mode 100644
index 0000000000..f61e172436
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -0,0 +1,76 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_tpb_thread_body.h>
+#include <iostream>
+
+gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
+  : d_exec(block)
+{
+  // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
+
+  gr_block_detail	*d = block->detail().get();
+  gr_block_executor::state s;
+
+  while (1){
+    d->d_tpb.clear_changed();
+    s = d_exec.run_one_iteration();
+
+    switch(s){
+    case gr_block_executor::READY:		// Tell neighbors we made progress.
+      d->d_tpb.notify_neighbors(d);
+      break;
+
+    case gr_block_executor::READY_NO_OUTPUT:	// Notify upstream only
+      d->d_tpb.notify_upstream(d);
+      break;
+
+    case gr_block_executor::DONE:		// Game over.
+      d->d_tpb.notify_neighbors(d);
+      return;
+
+    case gr_block_executor::BLKD_IN:		// Wait for input.
+      {
+	gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
+	while(!d->d_tpb.input_changed)
+	  d->d_tpb.input_cond.wait(guard);
+      }
+      break;
+      
+    case gr_block_executor::BLKD_OUT:		// Wait for output buffer space.
+      {
+	gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
+	while(!d->d_tpb.output_changed)
+	  d->d_tpb.output_cond.wait(guard);
+      }
+      break;
+
+    default:
+      assert(0);
+    }
+  }
+}
+
+gr_tpb_thread_body::~gr_tpb_thread_body()
+{
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
new file mode 100644
index 0000000000..a630b1be9f
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
@@ -0,0 +1,45 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_TPB_THREAD_BODY_H
+#define INCLUDED_GR_TPB_THREAD_BODY_H
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+
+/*!
+ * \brief The body of each thread-per-block thread.
+ *
+ * One of these is instantiated in its own thread for each block.  The
+ * constructor turns into the main loop which returns when the block is
+ * done or is interrupted.
+ */
+
+class gr_tpb_thread_body {
+  gr_block_executor	d_exec;
+
+public:
+  gr_tpb_thread_body(gr_block_sptr block);
+  ~gr_tpb_thread_body();
+};
+
+
+#endif /* INCLUDED_GR_TPB_THREAD_BODY_H */
diff --git a/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc b/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
index ad40f724d0..7434cf657f 100644
--- a/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
@@ -52,7 +52,7 @@ t0_body ()
   int	nitems = 4000 / sizeof (int);
   int	counter = 0;
 
-  gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
+  gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
 
   int last_sa;
   int sa;
@@ -87,8 +87,8 @@ t1_body ()
   int	write_counter = 0;
   int	read_counter = 0;
 
-  gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
-  gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0));
+  gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
+  gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0, gr_block_sptr()));
   
 
   int sa;
@@ -162,8 +162,8 @@ t2_body ()
   
   int	nitems = (64 * (1L << 10)) / sizeof (int);	// 64K worth of ints
 
-  gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
-  gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0));
+  gr_buffer_sptr buf(gr_make_buffer (nitems, sizeof (int), gr_block_sptr()));
+  gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0, gr_block_sptr()));
 
   int	read_counter = 0;
   int	write_counter = 0;
@@ -229,7 +229,7 @@ t3_body ()
   int	nitems = (64 * (1L << 10)) / sizeof (int);
 
   static const int N = 5;
-  gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
+  gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
   gr_buffer_reader_sptr 	reader[N];
   int			read_counter[N];
   int			write_counter = 0;
@@ -237,7 +237,7 @@ t3_body ()
 
   for (int i = 0; i < N; i++){
     read_counter[i] = 0;
-    reader[i] = gr_buffer_add_reader (buf, 0);
+    reader[i] = gr_buffer_add_reader (buf, 0, gr_block_sptr());
   }
 
   for (int lc = 0; lc < 1000; lc++){
-- 
cgit v1.2.3