From 2c8ea58e4d76f54c98d71d3fcc64bc29da490908 Mon Sep 17 00:00:00 2001
From: eb <eb@221aa14e-8319-0410-a670-987f0aec2ac5>
Date: Tue, 19 Aug 2008 23:09:56 +0000
Subject: Merged features/mp-sched -r8915:9335 into the trunk.  The trunk now
 contains the SMP aware scheduler.  This changeset introduces a dependency on
 boost 1.35 or later. See source:gnuradio/trunk/README.building-boost for
 additional info.

git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@9336 221aa14e-8319-0410-a670-987f0aec2ac5
---
 gnuradio-core/src/lib/Makefile.am                  |   3 +
 gnuradio-core/src/lib/filter/Makefile.am           |  27 +-
 gnuradio-core/src/lib/filter/dotprod_fff_altivec.c | 162 ++++++++++
 gnuradio-core/src/lib/filter/dotprod_fff_altivec.h |  49 +++
 gnuradio-core/src/lib/filter/gr_altivec.c          |  38 +++
 gnuradio-core/src/lib/filter/gr_altivec.h          |  57 ++++
 gnuradio-core/src/lib/filter/gr_cpu.cc             | 136 ---------
 gnuradio-core/src/lib/filter/gr_cpu.h              |   1 +
 gnuradio-core/src/lib/filter/gr_cpu_powerpc.cc     |  59 ++++
 gnuradio-core/src/lib/filter/gr_cpu_x86.cc         | 113 +++++++
 gnuradio-core/src/lib/filter/gr_fir_fff_altivec.cc |  83 +++++
 gnuradio-core/src/lib/filter/gr_fir_fff_altivec.h  |  45 +++
 .../src/lib/filter/gr_fir_sysconfig_powerpc.cc     | 340 +++++++++++++++++++++
 .../src/lib/filter/gr_fir_sysconfig_powerpc.h      |  46 +++
 gnuradio-core/src/lib/filter/qa_dotprod_powerpc.cc |  32 ++
 gnuradio-core/src/lib/filter/qa_filter.cc          |   4 +-
 gnuradio-core/src/lib/filter/qa_gr_fir_fff.cc      |   4 +-
 gnuradio-core/src/lib/filter/sysconfig_powerpc.cc  |  38 +++
 gnuradio-core/src/lib/general/gr_math.h            |  44 +++
 gnuradio-core/src/lib/general/gri_fft.cc           |  27 +-
 gnuradio-core/src/lib/runtime/Makefile.am          |  20 +-
 gnuradio-core/src/lib/runtime/gr_block.cc          |   8 +
 gnuradio-core/src/lib/runtime/gr_block.h           |   6 +-
 gnuradio-core/src/lib/runtime/gr_block_detail.h    |  11 +-
 gnuradio-core/src/lib/runtime/gr_block_executor.cc | 329 ++++++++++++++++++++
 gnuradio-core/src/lib/runtime/gr_block_executor.h  |  69 +++++
 gnuradio-core/src/lib/runtime/gr_buffer.cc         |  31 +-
 gnuradio-core/src/lib/runtime/gr_buffer.h          |  71 ++++-
 gnuradio-core/src/lib/runtime/gr_buffer.i          |   8 +-
 gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc |  48 ++-
 gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h  |   9 +-
 gnuradio-core/src/lib/runtime/gr_flowgraph.h       |   4 +-
 .../src/lib/runtime/gr_hier_block2_detail.cc       |   2 +-
 gnuradio-core/src/lib/runtime/gr_scheduler.cc      |  33 ++
 gnuradio-core/src/lib/runtime/gr_scheduler.h       |  64 ++++
 gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc  |  87 ++++++
 gnuradio-core/src/lib/runtime/gr_scheduler_sts.h   |  62 ++++
 .../src/lib/runtime/gr_scheduler_thread.cc         | 110 -------
 .../src/lib/runtime/gr_scheduler_thread.h          |  59 ----
 gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc  |  95 ++++++
 gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h   |  60 ++++
 .../lib/runtime/gr_single_threaded_scheduler.cc    |  12 +-
 gnuradio-core/src/lib/runtime/gr_top_block.cc      |   3 +-
 gnuradio-core/src/lib/runtime/gr_top_block_impl.cc | 135 ++++----
 gnuradio-core/src/lib/runtime/gr_top_block_impl.h  |  32 +-
 .../src/lib/runtime/gr_top_block_impl_sts.cc       | 128 --------
 .../src/lib/runtime/gr_top_block_impl_sts.h        |  55 ----
 gnuradio-core/src/lib/runtime/gr_tpb_detail.cc     |  67 ++++
 gnuradio-core/src/lib/runtime/gr_tpb_detail.h      |  81 +++++
 .../src/lib/runtime/gr_tpb_thread_body.cc          |  76 +++++
 gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h |  45 +++
 gnuradio-core/src/lib/runtime/qa_gr_buffer.cc      |  14 +-
 gnuradio-core/src/python/gnuradio/gr/top_block.py  |  33 +-
 gnuradio-core/src/tests/Makefile.am                |   4 +-
 54 files changed, 2527 insertions(+), 652 deletions(-)
 create mode 100644 gnuradio-core/src/lib/filter/dotprod_fff_altivec.c
 create mode 100644 gnuradio-core/src/lib/filter/dotprod_fff_altivec.h
 create mode 100644 gnuradio-core/src/lib/filter/gr_altivec.c
 create mode 100644 gnuradio-core/src/lib/filter/gr_altivec.h
 delete mode 100644 gnuradio-core/src/lib/filter/gr_cpu.cc
 create mode 100644 gnuradio-core/src/lib/filter/gr_cpu_powerpc.cc
 create mode 100644 gnuradio-core/src/lib/filter/gr_cpu_x86.cc
 create mode 100644 gnuradio-core/src/lib/filter/gr_fir_fff_altivec.cc
 create mode 100644 gnuradio-core/src/lib/filter/gr_fir_fff_altivec.h
 create mode 100644 gnuradio-core/src/lib/filter/gr_fir_sysconfig_powerpc.cc
 create mode 100644 gnuradio-core/src/lib/filter/gr_fir_sysconfig_powerpc.h
 create mode 100644 gnuradio-core/src/lib/filter/qa_dotprod_powerpc.cc
 create mode 100644 gnuradio-core/src/lib/filter/sysconfig_powerpc.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_block_executor.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_block_executor.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
 delete mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc
 delete mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_thread.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
 delete mode 100644 gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc
 delete mode 100644 gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_tpb_detail.h
 create mode 100644 gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
 create mode 100644 gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h

(limited to 'gnuradio-core/src')

diff --git a/gnuradio-core/src/lib/Makefile.am b/gnuradio-core/src/lib/Makefile.am
index 66567048cb..e4646cfcc2 100644
--- a/gnuradio-core/src/lib/Makefile.am
+++ b/gnuradio-core/src/lib/Makefile.am
@@ -36,6 +36,8 @@ libgnuradio_core_la_LDFLAGS = $(NO_UNDEFINED) -version-info 0:0:0
 libgnuradio_core_qa_la_SOURCES = bug_work_around_6.cc
 libgnuradio_core_qa_la_LDFLAGS = $(NO_UNDEFINED) -version-info 0:0:0 \
 				 $(LIBGNURADIO_CORE_EXTRA_LDFLAGS)
+
+
 libgnuradio_core_la_LIBADD  = 		\
 	filter/libfilter.la		\
 	g72x/libccitt.la		\
@@ -47,6 +49,7 @@ libgnuradio_core_la_LIBADD  = 		\
 	reed-solomon/librs.la		\
 	runtime/libruntime.la		\
 	$(OMNITHREAD_LA)		\
+	$(GRUEL_LA)			\
 	$(FFTW3F_LIBS)
 
 libgnuradio_core_qa_la_LIBADD  = 	\
diff --git a/gnuradio-core/src/lib/filter/Makefile.am b/gnuradio-core/src/lib/filter/Makefile.am
index 1ddb84415f..6694616320 100644
--- a/gnuradio-core/src/lib/filter/Makefile.am
+++ b/gnuradio-core/src/lib/filter/Makefile.am
@@ -117,7 +117,7 @@ generic_qa_CODE =			\
 x86_CODE = 				\
 	sysconfig_x86.cc		\
 	gr_fir_sysconfig_x86.cc		\
-	gr_cpu.cc			\
+	gr_cpu_x86.cc			\
 	gr_fir_ccc_simd.cc		\
 	gr_fir_ccc_x86.cc		\
 	gr_fir_fff_simd.cc		\
@@ -166,6 +166,18 @@ x86_qa_CODE = 				\
 	qa_complex_dotprod_x86.cc	\
 	qa_ccomplex_dotprod_x86.cc	
 
+powerpc_CODE = \
+	sysconfig_powerpc.cc \
+	gr_fir_sysconfig_powerpc.cc \
+	gr_cpu_powerpc.cc \
+	gr_fir_fff_altivec.cc \
+	gr_altivec.c \
+	dotprod_fff_altivec.c
+
+powerpc_qa_CODE = \
+	qa_dotprod_powerpc.cc
+
+
 #
 # include each <foo>_CODE entry here...
 #
@@ -175,7 +187,9 @@ EXTRA_libfilter_la_SOURCES =		\
 	$(x86_CODE)			\
 	$(x86_SUBCODE)			\
 	$(x86_64_SUBCODE)		\
-	$(x86_qa_CODE)
+	$(x86_qa_CODE)			\
+	$(powerpc_CODE)			\
+	$(powerpc_qa_CODE)
 
 
 EXTRA_DIST = 					\
@@ -234,6 +248,11 @@ endif
 libfilter_qa_la_SOURCES = $(libfilter_qa_la_common_SOURCES) $(x86_qa_CODE)
 endif
 
+if MD_CPU_powerpc
+libfilter_la_SOURCES = $(libfilter_la_common_SOURCES) $(powerpc_CODE)
+libfilter_qa_la_SOURCES = $(libfilter_qa_la_common_SOURCES) $(powerpc_qa_CODE)
+endif
+
 
 grinclude_HEADERS = 			\
 	$(GENERATED_H)			\
@@ -245,12 +264,14 @@ grinclude_HEADERS = 			\
 	float_dotprod_generic.h		\
 	float_dotprod_x86.h		\
 	gr_adaptive_fir_ccf.h		\
+	gr_altivec.h			\
 	gr_cma_equalizer_cc.h		\
 	gr_cpu.h			\
 	gr_fft_filter_ccc.h		\
 	gr_fft_filter_fff.h		\
 	gr_filter_delay_fc.h		\
 	gr_fir_sysconfig_x86.h		\
+	gr_fir_sysconfig_powerpc.h	\
 	gr_fractional_interpolator_ff.h	\
 	gr_fractional_interpolator_cc.h	\
 	gr_goertzel_fc.h		\
@@ -272,6 +293,7 @@ grinclude_HEADERS = 			\
 
 noinst_HEADERS = 			\
 	assembly.h			\
+	dotprod_fff_altivec.h		\
 	gr_fir_scc_simd.h		\
 	gr_fir_scc_x86.h		\
 	gr_fir_fcc_simd.h		\
@@ -280,6 +302,7 @@ noinst_HEADERS = 			\
 	gr_fir_ccf_x86.h		\
 	gr_fir_ccc_simd.h		\
 	gr_fir_ccc_x86.h		\
+	gr_fir_fff_altivec.h		\
 	gr_fir_fff_simd.h		\
 	gr_fir_fff_x86.h		\
 	gr_fir_fsf_simd.h		\
diff --git a/gnuradio-core/src/lib/filter/dotprod_fff_altivec.c b/gnuradio-core/src/lib/filter/dotprod_fff_altivec.c
new file mode 100644
index 0000000000..ebddeb502d
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/dotprod_fff_altivec.c
@@ -0,0 +1,162 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <dotprod_fff_altivec.h>
+#include <gr_altivec.h>
+
+/*!
+ * \param x any value
+ * \param pow2 must be a power of 2
+ * \returns \p x rounded down to a multiple of \p pow2.
+ */
+static inline size_t
+gr_p2_round_down(size_t x, size_t pow2)
+{
+  return x & -pow2;
+}
+
+
+#if 0
+
+float
+dotprod_fff_altivec(const float *a, const float *b, size_t n)
+{
+  float	sum = 0;
+  for (size_t i = 0; i < n; i++){
+    sum += a[i] * b[i];
+  }
+  return sum;
+}
+
+#else
+
+/*
+ *  preconditions:
+ *
+ *    n > 0 and a multiple of 4
+ *    a   4-byte aligned
+ *    b  16-byte aligned
+ */
+float
+dotprod_fff_altivec(const float *_a, const float *_b, size_t n)
+{
+  const vector float *a = (const vector float *) _a;
+  const vector float *b = (const vector float *) _b;
+
+  static const size_t UNROLL_CNT = 4;
+
+  n = gr_p2_round_down(n, 4);
+  size_t loop_cnt = n / (UNROLL_CNT * FLOATS_PER_VEC);
+  size_t nleft = n % (UNROLL_CNT * FLOATS_PER_VEC);
+
+  // printf("n = %zd, loop_cnt = %zd, nleft = %zd\n", n, loop_cnt, nleft);
+
+  // Used with vperm to build a* from p*
+  vector unsigned char lvsl_a = vec_lvsl(0, _a);
+
+  vector float p0, p1, p2, p3;
+  vector float a0, a1, a2, a3;
+  vector float b0, b1, b2, b3;
+  vector float acc0 = {0, 0, 0, 0};
+  vector float acc1 = {0, 0, 0, 0};
+  vector float acc2 = {0, 0, 0, 0};
+  vector float acc3 = {0, 0, 0, 0};
+
+  // wind in
+
+  p0 = vec_ld(0*VS, a);
+  p1 = vec_ld(1*VS, a);
+  p2 = vec_ld(2*VS, a);
+  p3 = vec_ld(3*VS, a);
+  a += UNROLL_CNT;
+
+  a0 = vec_perm(p0, p1, lvsl_a);
+  b0 = vec_ld(0*VS, b);
+  p0 = vec_ld(0*VS, a);
+
+  size_t i;
+  for (i = 0; i < loop_cnt; i++){
+
+    a1 = vec_perm(p1, p2, lvsl_a);
+    b1 = vec_ld(1*VS, b);
+    p1 = vec_ld(1*VS, a);
+    acc0 = vec_madd(a0, b0, acc0);
+
+    a2 = vec_perm(p2, p3, lvsl_a);
+    b2 = vec_ld(2*VS, b);
+    p2 = vec_ld(2*VS, a);
+    acc1 = vec_madd(a1, b1, acc1);
+
+    a3 = vec_perm(p3, p0, lvsl_a);
+    b3 = vec_ld(3*VS, b);
+    p3 = vec_ld(3*VS, a);
+    acc2 = vec_madd(a2, b2, acc2);
+
+    a += UNROLL_CNT;
+    b += UNROLL_CNT;
+
+    a0 = vec_perm(p0, p1, lvsl_a);
+    b0 = vec_ld(0*VS, b);
+    p0 = vec_ld(0*VS, a);
+    acc3 = vec_madd(a3, b3, acc3);
+  }
+
+  /*
+   * The compiler ought to be able to figure out that 0, 4, 8 and 12
+   * are the only possible values for nleft.
+   */
+  switch (nleft){
+  case 0:
+    break;
+    
+  case 4:
+    acc0 = vec_madd(a0, b0, acc0);
+    break;
+
+  case 8:
+    a1 = vec_perm(p1, p2, lvsl_a);
+    b1 = vec_ld(1*VS, b);
+    acc0 = vec_madd(a0, b0, acc0);
+    acc1 = vec_madd(a1, b1, acc1);
+    break;
+
+  case 12:
+    a1 = vec_perm(p1, p2, lvsl_a);
+    b1 = vec_ld(1*VS, b);
+    acc0 = vec_madd(a0, b0, acc0);
+    a2 = vec_perm(p2, p3, lvsl_a);
+    b2 = vec_ld(2*VS, b);
+    acc1 = vec_madd(a1, b1, acc1);
+    acc2 = vec_madd(a2, b2, acc2);
+    break;
+  }
+	    
+  acc0 = acc0 + acc1;
+  acc2 = acc2 + acc3;
+  acc0 = acc0 + acc2;
+
+  return horizontal_add_f(acc0);
+}
+
+#endif
diff --git a/gnuradio-core/src/lib/filter/dotprod_fff_altivec.h b/gnuradio-core/src/lib/filter/dotprod_fff_altivec.h
new file mode 100644
index 0000000000..d9ee52cf06
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/dotprod_fff_altivec.h
@@ -0,0 +1,49 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_DOTPROD_FFF_ALTIVEC_H
+#define INCLUDED_DOTPROD_FFF_ALTIVEC_H
+
+#include <stddef.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/*!
+ * <pre>
+ *
+ *  preconditions:
+ *
+ *    n > 0 and a multiple of 4
+ *    a   4-byte aligned
+ *    b  16-byte aligned
+ *
+ * </pre>
+ */
+float 
+dotprod_fff_altivec(const float *a, const float *b, size_t n);
+
+#ifdef __cplusplus
+}
+#endif
+
+
+#endif /* INCLUDED_DOTPROD_FFF_ALTIVEC_H */
diff --git a/gnuradio-core/src/lib/filter/gr_altivec.c b/gnuradio-core/src/lib/filter/gr_altivec.c
new file mode 100644
index 0000000000..01ca95f0de
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/gr_altivec.c
@@ -0,0 +1,38 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#include <gr_altivec.h>
+
+void
+gr_print_vector_float(FILE *fp, vector float v)
+{
+  union v_float_u	u;
+  u.v = v;
+  fprintf(fp, "{ %f, %f, %f, %f }", u.f[0], u.f[1], u.f[2], u.f[3]);
+}
+  
+void
+gr_pvf(FILE *fp, const char *label, vector float v)
+{
+  fprintf(fp, "%s = ", label);
+  gr_print_vector_float(fp, v);
+  putc('\n', fp);
+}
diff --git a/gnuradio-core/src/lib/filter/gr_altivec.h b/gnuradio-core/src/lib/filter/gr_altivec.h
new file mode 100644
index 0000000000..176579a6af
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/gr_altivec.h
@@ -0,0 +1,57 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_ALTIVEC_H
+#define INCLUDED_GR_ALTIVEC_H
+
+#include <altivec.h>
+#include <stddef.h>
+#include <stdio.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#define VS             sizeof(vector float)
+#define FLOATS_PER_VEC (sizeof(vector float)/sizeof(float))
+
+union v_float_u {
+  vector float	v;
+  float		f[FLOATS_PER_VEC];
+};
+
+void gr_print_vector_float(FILE *fp, vector float v);
+void gr_pvf(FILE *fp, const char *label, vector float v);
+
+static inline float
+horizontal_add_f(vector float v)
+{
+  union v_float_u u;
+  vector float	  t0 = vec_add(v, vec_sld(v, v, 8));
+  vector float	  t1 = vec_add(t0, vec_sld(t0, t0, 4));
+  u.v = t1;
+  return u.f[0];
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* INCLUDED_GR_ALTIVEC_H */
diff --git a/gnuradio-core/src/lib/filter/gr_cpu.cc b/gnuradio-core/src/lib/filter/gr_cpu.cc
deleted file mode 100644
index 517c10e9ad..0000000000
--- a/gnuradio-core/src/lib/filter/gr_cpu.cc
+++ /dev/null
@@ -1,136 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2002,2008 Free Software Foundation, Inc.
- * 
- * This file is part of GNU Radio
- * 
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- * 
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#include <gr_cpu.h>
-
-/*
- * execute CPUID instruction, return EAX, EBX, ECX and EDX values in result
- */
-extern "C" {
-void cpuid_x86 (unsigned int op, unsigned int result[4]);
-};
-
-/*
- * CPUID functions returning a single datum
- */
-
-static inline unsigned int cpuid_eax(unsigned int op)
-{
-  unsigned int	regs[4];
-  cpuid_x86 (op, regs);
-  return regs[0];
-}
-
-static inline unsigned int cpuid_ebx(unsigned int op)
-{
-  unsigned int	regs[4];
-  cpuid_x86 (op, regs);
-  return regs[1];
-}
-
-static inline unsigned int cpuid_ecx(unsigned int op)
-{
-  unsigned int	regs[4];
-  cpuid_x86 (op, regs);
-  return regs[2];
-}
-
-static inline unsigned int cpuid_edx(unsigned int op)
-{
-  unsigned int	regs[4];
-  cpuid_x86 (op, regs);
-  return regs[3];
-}
-
-// ----------------------------------------------------------------
-
-bool
-gr_cpu::has_mmx ()
-{
-  unsigned int edx = cpuid_edx (1);	// standard features
-  return (edx & (1 << 23)) != 0;
-}
-
-bool
-gr_cpu::has_sse ()
-{
-  unsigned int edx = cpuid_edx (1);	// standard features
-  return (edx & (1 << 25)) != 0;
-}
-
-bool
-gr_cpu::has_sse2 ()
-{
-  unsigned int edx = cpuid_edx (1);	// standard features
-  return (edx & (1 << 26)) != 0;
-}
-
-bool
-gr_cpu::has_sse3 ()
-{
-  unsigned int ecx = cpuid_ecx (1);	// standard features
-  return (ecx & (1 << 0)) != 0;
-}
-
-bool
-gr_cpu::has_ssse3 ()
-{
-  unsigned int ecx = cpuid_ecx (1);	// standard features
-  return (ecx & (1 << 9)) != 0;
-}
-
-bool
-gr_cpu::has_sse4_1 ()
-{
-  unsigned int ecx = cpuid_ecx (1);	// standard features
-  return (ecx & (1 << 19)) != 0;
-}
-
-bool
-gr_cpu::has_sse4_2 ()
-{
-  unsigned int ecx = cpuid_ecx (1);	// standard features
-  return (ecx & (1 << 20)) != 0;
-}
-
-
-bool
-gr_cpu::has_3dnow ()
-{
-  unsigned int extended_fct_count = cpuid_eax (0x80000000);
-  if (extended_fct_count < 0x80000001)
-    return false;
-
-  unsigned int extended_features = cpuid_edx (0x80000001);
-  return (extended_features & (1 << 31)) != 0;
-}
-
-bool
-gr_cpu::has_3dnowext ()
-{
-  unsigned int extended_fct_count = cpuid_eax (0x80000000);
-  if (extended_fct_count < 0x80000001)
-    return false;
-
-  unsigned int extended_features = cpuid_edx (0x80000001);
-  return (extended_features & (1 << 30)) != 0;
-}
diff --git a/gnuradio-core/src/lib/filter/gr_cpu.h b/gnuradio-core/src/lib/filter/gr_cpu.h
index 5967d98685..ef10beae1f 100644
--- a/gnuradio-core/src/lib/filter/gr_cpu.h
+++ b/gnuradio-core/src/lib/filter/gr_cpu.h
@@ -33,6 +33,7 @@ struct gr_cpu {
   static bool has_sse4_2 ();
   static bool has_3dnow ();
   static bool has_3dnowext ();
+  static bool has_altivec ();
 };
 
 #endif /* _GR_CPU_H_ */
diff --git a/gnuradio-core/src/lib/filter/gr_cpu_powerpc.cc b/gnuradio-core/src/lib/filter/gr_cpu_powerpc.cc
new file mode 100644
index 0000000000..35c342aaa7
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/gr_cpu_powerpc.cc
@@ -0,0 +1,59 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2002,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <gr_cpu.h>
+
+bool
+gr_cpu::has_mmx ()
+{
+  return false;
+}
+
+bool
+gr_cpu::has_sse ()
+{
+  return false;
+}
+
+bool
+gr_cpu::has_sse2 ()
+{
+  return false;
+}
+
+bool
+gr_cpu::has_3dnow ()
+{
+  return false;
+}
+
+bool
+gr_cpu::has_3dnowext ()
+{
+  return false;
+}
+
+bool
+gr_cpu::has_altivec ()
+{
+  return true;		// FIXME assume we've always got it
+}
diff --git a/gnuradio-core/src/lib/filter/gr_cpu_x86.cc b/gnuradio-core/src/lib/filter/gr_cpu_x86.cc
new file mode 100644
index 0000000000..a13a69c06f
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/gr_cpu_x86.cc
@@ -0,0 +1,113 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2002 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <gr_cpu.h>
+
+/*
+ * execute CPUID instruction, return EAX, EBX, ECX and EDX values in result
+ */
+extern "C" {
+void cpuid_x86 (unsigned int op, unsigned int result[4]);
+};
+
+/*
+ * CPUID functions returning a single datum
+ */
+
+static inline unsigned int cpuid_eax(unsigned int op)
+{
+  unsigned int	regs[4];
+  cpuid_x86 (op, regs);
+  return regs[0];
+}
+
+static inline unsigned int cpuid_ebx(unsigned int op)
+{
+  unsigned int	regs[4];
+  cpuid_x86 (op, regs);
+  return regs[1];
+}
+
+static inline unsigned int cpuid_ecx(unsigned int op)
+{
+  unsigned int	regs[4];
+  cpuid_x86 (op, regs);
+  return regs[2];
+}
+
+static inline unsigned int cpuid_edx(unsigned int op)
+{
+  unsigned int	regs[4];
+  cpuid_x86 (op, regs);
+  return regs[3];
+}
+
+// ----------------------------------------------------------------
+
+bool
+gr_cpu::has_mmx ()
+{
+  unsigned int edx = cpuid_edx (1);	// standard features
+  return (edx & (1 << 23)) != 0;
+}
+
+bool
+gr_cpu::has_sse ()
+{
+  unsigned int edx = cpuid_edx (1);	// standard features
+  return (edx & (1 << 25)) != 0;
+}
+
+bool
+gr_cpu::has_sse2 ()
+{
+  unsigned int edx = cpuid_edx (1);	// standard features
+  return (edx & (1 << 26)) != 0;
+}
+
+bool
+gr_cpu::has_3dnow ()
+{
+  unsigned int extended_fct_count = cpuid_eax (0x80000000);
+  if (extended_fct_count < 0x80000001)
+    return false;
+
+  unsigned int extended_features = cpuid_edx (0x80000001);
+  return (extended_features & (1 << 31)) != 0;
+}
+
+bool
+gr_cpu::has_3dnowext ()
+{
+  unsigned int extended_fct_count = cpuid_eax (0x80000000);
+  if (extended_fct_count < 0x80000001)
+    return false;
+
+  unsigned int extended_features = cpuid_edx (0x80000001);
+  return (extended_features & (1 << 30)) != 0;
+}
+
+bool
+gr_cpu::has_altivec ()
+{
+  return false;
+}
diff --git a/gnuradio-core/src/lib/filter/gr_fir_fff_altivec.cc b/gnuradio-core/src/lib/filter/gr_fir_fff_altivec.cc
new file mode 100644
index 0000000000..7583f5c1fb
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/gr_fir_fff_altivec.cc
@@ -0,0 +1,83 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_fir_fff_altivec.h>
+#include <stdlib.h>
+#include <stdexcept>
+#include <assert.h>
+#include <gr_math.h>
+#include <gr_altivec.h>
+#include <dotprod_fff_altivec.h>
+
+gr_fir_fff_altivec::gr_fir_fff_altivec()
+  : gr_fir_fff_generic(),
+    d_naligned_taps(0), d_aligned_taps(0)
+{
+}
+
+gr_fir_fff_altivec::gr_fir_fff_altivec (const std::vector<float> &new_taps)
+  : gr_fir_fff_generic(new_taps),
+    d_naligned_taps(0), d_aligned_taps(0)
+{
+  set_taps(new_taps);
+}
+
+gr_fir_fff_altivec::~gr_fir_fff_altivec()
+{
+  if (d_aligned_taps){
+    free(d_aligned_taps);
+    d_aligned_taps = 0;
+  }
+}
+
+void
+gr_fir_fff_altivec::set_taps(const std::vector<float> &inew_taps)
+{
+  gr_fir_fff_generic::set_taps(inew_taps);	// call superclass
+  d_naligned_taps = gr_p2_round_up(ntaps(), FLOATS_PER_VEC);
+
+  if (d_aligned_taps){
+    free(d_aligned_taps);
+    d_aligned_taps = 0;
+  }
+  void *p;
+  int r = posix_memalign(&p,  sizeof(vector float), d_naligned_taps * sizeof(d_aligned_taps[0]));
+  if (r != 0){
+    throw std::bad_alloc();
+  }
+  d_aligned_taps = (float *) p;
+  memcpy(d_aligned_taps, &d_taps[0], ntaps() * sizeof(d_aligned_taps[0]));
+  for (size_t i = ntaps(); i < d_naligned_taps; i++)
+    d_aligned_taps[i] = 0.0;
+}
+
+
+float 
+gr_fir_fff_altivec::filter (const float input[])
+{
+  if (d_naligned_taps == 0)
+    return 0.0;
+  
+  return dotprod_fff_altivec(input, d_aligned_taps, d_naligned_taps);
+}
diff --git a/gnuradio-core/src/lib/filter/gr_fir_fff_altivec.h b/gnuradio-core/src/lib/filter/gr_fir_fff_altivec.h
new file mode 100644
index 0000000000..1694f55243
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/gr_fir_fff_altivec.h
@@ -0,0 +1,45 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_FIR_FFF_ALTIVEC_H
+#define INCLUDED_GR_FIR_FFF_ALTIVEC_H
+
+#include <gr_fir_fff_generic.h>
+
+/*!
+ * \brief altivec version of gr_fir_fff
+ */
+class gr_fir_fff_altivec : public gr_fir_fff_generic
+{
+protected:
+
+  size_t    d_naligned_taps;  // number of taps (multiple of 4)
+  float	   *d_aligned_taps;   // 16-byte aligned, and zero padded to multiple of 4
+
+public:
+  gr_fir_fff_altivec();
+  gr_fir_fff_altivec(const std::vector<float> &taps);
+  ~gr_fir_fff_altivec();
+
+  virtual void set_taps (const std::vector<float> &taps);
+  virtual float filter (const float input[]);
+};
+
+#endif /* INCLUDED_GR_FIR_FFF_ALTIVEC_H */
diff --git a/gnuradio-core/src/lib/filter/gr_fir_sysconfig_powerpc.cc b/gnuradio-core/src/lib/filter/gr_fir_sysconfig_powerpc.cc
new file mode 100644
index 0000000000..34d3f81355
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/gr_fir_sysconfig_powerpc.cc
@@ -0,0 +1,340 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2002,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_fir_sysconfig_powerpc.h>
+#include <gr_cpu.h>
+
+#include <gr_fir_ccf.h>
+#include <gr_fir_ccf_generic.h>
+//#include <gr_fir_ccf_altivec.h>
+#include <gr_fir_fcc.h>
+#include <gr_fir_fcc_generic.h>
+//#include <gr_fir_fcc_altivec.h>
+#include <gr_fir_fff.h>
+#include <gr_fir_fff_generic.h>
+#include <gr_fir_fff_altivec.h>
+#include <gr_fir_fsf.h>
+#include <gr_fir_fsf_generic.h>
+//#include <gr_fir_fsf_powerpc.h>
+#include <gr_fir_ccc.h>
+#include <gr_fir_ccc_generic.h>
+//#include <gr_fir_ccc_altivec.h>
+#include <gr_fir_scc.h>
+#include <gr_fir_scc_generic.h>
+//#include <gr_fir_scc_altivec.h>
+
+#include <iostream>
+using std::cerr;
+
+/*
+ * ----------------------------------------------------------------
+ * static functions that serve as constructors...
+ * ----------------------------------------------------------------
+ */
+
+#if 0
+static gr_fir_ccf *
+make_gr_fir_ccf_altivec(const std::vector<float> &taps)
+{
+  return new gr_fir_ccf_altivec(taps);
+}
+
+static gr_fir_fcc *
+make_gr_fir_fcc_altivec(const std::vector<gr_complex> &taps)
+{
+  return new gr_fir_fcc_altivec(taps);
+}
+
+static gr_fir_ccc *
+make_gr_fir_ccc_altivec (const std::vector<gr_complex> &taps)
+{
+  return new gr_fir_ccc_altivec (taps);
+}
+#endif
+
+static gr_fir_fff *
+make_gr_fir_fff_altivec (const std::vector<float> &taps)
+{
+  return new gr_fir_fff_altivec (taps);
+}
+
+#if 0
+static gr_fir_fsf *
+make_gr_fir_fsf_altivec (const std::vector<float> &taps)
+{
+  return new gr_fir_fsf_altivec (taps);
+}
+
+static gr_fir_scc *
+make_gr_fir_scc_altivec(const std::vector<gr_complex> &taps)
+{
+  return new gr_fir_scc_altivec(taps);
+}
+#endif
+
+/*
+ * ----------------------------------------------------------------
+ * Return instances of the fastest powerpc versions of these classes.
+ *
+ * check CPUID, if has altivec, return altivec version,
+ *		else return generic version.
+ * ----------------------------------------------------------------
+ */
+
+gr_fir_ccf *
+gr_fir_sysconfig_powerpc::create_gr_fir_ccf (const std::vector<float> &taps)
+{
+  static bool first = true;
+
+#if 0
+  if (gr_cpu::has_altivec ()){
+    if (first){
+      cerr << ">>> gr_fir_ccf: using altivec\n";
+      first = false;
+    }
+    return make_gr_fir_ccf_altivec (taps);
+  }
+#endif
+
+  if (0 && first){
+    cerr << ">>> gr_fir_ccf: handing off to parent class\n";
+    first = false;
+  }
+  return gr_fir_sysconfig_generic::create_gr_fir_ccf (taps);
+}
+
+gr_fir_fcc *
+gr_fir_sysconfig_powerpc::create_gr_fir_fcc (const std::vector<gr_complex> &taps)
+{
+  static bool first = true;
+
+#if 0
+  if (gr_cpu::has_altivec ()){
+    if (first){
+      cerr << ">>> gr_fir_fcc: using altivec\n";
+      first = false;
+    }
+    return make_gr_fir_fcc_altivec (taps);
+  }
+#endif
+
+  if (0 && first){
+    cerr << ">>> gr_fir_fcc: handing off to parent class\n";
+    first = false;
+  }
+  return gr_fir_sysconfig_generic::create_gr_fir_fcc (taps);
+}
+
+gr_fir_ccc *
+gr_fir_sysconfig_powerpc::create_gr_fir_ccc (const std::vector<gr_complex> &taps)
+{
+  static bool first = true;
+
+#if 0
+  if (gr_cpu::has_altivec ()){
+    if (first){
+      cerr << ">>> gr_fir_ccc: using altivec\n";
+      first = false;
+    }
+    return make_gr_fir_ccc_altivec (taps);
+  }
+#endif
+  
+  if (0 && first){
+    cerr << ">>> gr_fir_ccc: handing off to parent class\n";
+    first = false;
+  }
+  return gr_fir_sysconfig_generic::create_gr_fir_ccc (taps);
+}
+
+gr_fir_fff *
+gr_fir_sysconfig_powerpc::create_gr_fir_fff (const std::vector<float> &taps)
+{
+  static bool first = true;
+
+  if (gr_cpu::has_altivec ()){
+    if (first){
+      cerr << ">>> gr_fir_fff: using altivec\n";
+      first = false;
+    }
+    return make_gr_fir_fff_altivec (taps);
+  }
+  
+  if (0 && first){
+    cerr << ">>> gr_fir_fff: handing off to parent class\n";
+    first = false;
+  }
+  return gr_fir_sysconfig_generic::create_gr_fir_fff (taps);
+}
+
+gr_fir_fsf *
+gr_fir_sysconfig_powerpc::create_gr_fir_fsf (const std::vector<float> &taps)
+{
+  static bool first = true;
+
+#if 0
+  if (gr_cpu::has_altivec ()){
+    if (first){
+      cerr << ">>> gr_fir_fsf: using altivec\n";
+      first = false;
+    }
+    return make_gr_fir_fsf_altivec (taps);
+  }
+#endif
+  
+  if (0 && first){
+    cerr << ">>> gr_fir_fsf: handing off to parent class\n";
+    first = false;
+  }
+  return gr_fir_sysconfig_generic::create_gr_fir_fsf (taps);
+}
+
+
+gr_fir_scc *
+gr_fir_sysconfig_powerpc::create_gr_fir_scc (const std::vector<gr_complex> &taps)
+{
+  static bool first = true;
+
+#if 0
+  if (gr_cpu::has_altivec ()){
+    if (first){
+      cerr << ">>> gr_fir_scc: using altivec\n";
+      first = false;
+    }
+    return make_gr_fir_scc_altivec (taps);
+  }
+#endif
+
+  if (0 && first){
+    cerr << ">>> gr_fir_scc: handing off to parent class\n";
+    first = false;
+  }
+  return gr_fir_sysconfig_generic::create_gr_fir_scc (taps);
+}
+
+/*
+ * ----------------------------------------------------------------
+ *         Return info about available implementations
+ * ----------------------------------------------------------------
+ */
+
+void 
+gr_fir_sysconfig_powerpc::get_gr_fir_ccf_info (std::vector<gr_fir_ccf_info> *info)
+{
+  // invoke parent..
+  gr_fir_sysconfig_generic::get_gr_fir_ccf_info (info);
+
+#if 0  
+  // add our stuff...
+  gr_fir_ccf_info	t;
+  if (gr_cpu::has_altivec ()){
+    t.name = "altivec";
+    t.create = make_gr_fir_ccf_altivec;
+    (*info).push_back (t);
+  }
+#endif
+}
+
+void 
+gr_fir_sysconfig_powerpc::get_gr_fir_fcc_info (std::vector<gr_fir_fcc_info> *info)
+{
+  // invoke parent..
+  gr_fir_sysconfig_generic::get_gr_fir_fcc_info (info);
+
+#if 0
+  // add our stuff...
+  gr_fir_fcc_info	t;
+  if (gr_cpu::has_altivec ()){
+    t.name = "altivec";
+    t.create = make_gr_fir_fcc_altivec;
+    (*info).push_back (t);
+  }
+#endif
+}
+
+void 
+gr_fir_sysconfig_powerpc::get_gr_fir_ccc_info (std::vector<gr_fir_ccc_info> *info)
+{
+  // invoke parent..
+  gr_fir_sysconfig_generic::get_gr_fir_ccc_info (info);
+
+#if 0
+  // add our stuff...
+  gr_fir_ccc_info	t;
+  if (gr_cpu::has_altivec ()){
+    t.name = "altivec";
+    t.create = make_gr_fir_ccc_altivec;
+    (*info).push_back (t);
+  }
+#endif
+}
+
+void 
+gr_fir_sysconfig_powerpc::get_gr_fir_fff_info (std::vector<gr_fir_fff_info> *info)
+{
+  // invoke parent..
+  gr_fir_sysconfig_generic::get_gr_fir_fff_info (info);
+
+  // add our stuff...
+  gr_fir_fff_info	t;
+  if (gr_cpu::has_altivec ()){
+    t.name = "altivec";
+    t.create = make_gr_fir_fff_altivec;
+    (*info).push_back (t);
+  }
+}
+
+void 
+gr_fir_sysconfig_powerpc::get_gr_fir_fsf_info (std::vector<gr_fir_fsf_info> *info)
+{
+  // invoke parent..
+  gr_fir_sysconfig_generic::get_gr_fir_fsf_info (info);
+
+#if 0
+  // add our stuff...
+  gr_fir_fsf_info	t;
+  if (gr_cpu::has_altivec ()){
+    t.name = "altivec";
+    t.create = make_gr_fir_fsf_altivec;
+    (*info).push_back (t);
+  }
+#endif
+}
+
+void 
+gr_fir_sysconfig_powerpc::get_gr_fir_scc_info (std::vector<gr_fir_scc_info> *info)
+{
+  // invoke parent..
+  gr_fir_sysconfig_generic::get_gr_fir_scc_info (info);
+
+#if 0
+  // add our stuff...
+  gr_fir_scc_info	t;
+  if (gr_cpu::has_altivec ()){
+    t.name = "altivec";
+    t.create = make_gr_fir_scc_altivec;
+    (*info).push_back (t);
+  }
+#endif
+}
diff --git a/gnuradio-core/src/lib/filter/gr_fir_sysconfig_powerpc.h b/gnuradio-core/src/lib/filter/gr_fir_sysconfig_powerpc.h
new file mode 100644
index 0000000000..9c31cdf1b7
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/gr_fir_sysconfig_powerpc.h
@@ -0,0 +1,46 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2002,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+#ifndef INCLUDED_GR_FIR_SYSCONFIG_POWERPC_H
+#define INCLUDED_GR_FIR_SYSCONFIG_POWERPC_H
+
+#include <gr_fir_sysconfig_generic.h>
+
+class gr_fir_sysconfig_powerpc : public gr_fir_sysconfig_generic {
+public:
+  virtual gr_fir_ccf *create_gr_fir_ccf (const std::vector<float> &taps);
+  virtual gr_fir_fcc *create_gr_fir_fcc (const std::vector<gr_complex> &taps);
+  virtual gr_fir_fff *create_gr_fir_fff (const std::vector<float> &taps);
+  virtual gr_fir_fsf *create_gr_fir_fsf (const std::vector<float> &taps);
+  virtual gr_fir_scc *create_gr_fir_scc (const std::vector<gr_complex> &taps);
+  virtual gr_fir_ccc *create_gr_fir_ccc (const std::vector<gr_complex> &taps);
+//virtual gr_fir_sss *create_gr_fir_sss (const std::vector<short> &taps);
+
+  virtual void get_gr_fir_ccf_info (std::vector<gr_fir_ccf_info> *info);
+  virtual void get_gr_fir_fcc_info (std::vector<gr_fir_fcc_info> *info);
+  virtual void get_gr_fir_fff_info (std::vector<gr_fir_fff_info> *info);
+  virtual void get_gr_fir_fsf_info (std::vector<gr_fir_fsf_info> *info);
+  virtual void get_gr_fir_scc_info (std::vector<gr_fir_scc_info> *info);
+  virtual void get_gr_fir_ccc_info (std::vector<gr_fir_ccc_info> *info);
+//virtual void get_gr_fir_sss_info (std::vector<gr_fir_sss_info> *info);
+};
+
+#endif
diff --git a/gnuradio-core/src/lib/filter/qa_dotprod_powerpc.cc b/gnuradio-core/src/lib/filter/qa_dotprod_powerpc.cc
new file mode 100644
index 0000000000..1b02a79c8f
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/qa_dotprod_powerpc.cc
@@ -0,0 +1,32 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2003 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+#include "qa_dotprod.h"
+
+CppUnit::TestSuite *
+qa_dotprod_suite ()
+{
+  CppUnit::TestSuite *s = new CppUnit::TestSuite ("dotprod");
+
+  // empty test suite
+
+  return s;
+}
diff --git a/gnuradio-core/src/lib/filter/qa_filter.cc b/gnuradio-core/src/lib/filter/qa_filter.cc
index e36fca45d4..e2fa72adda 100644
--- a/gnuradio-core/src/lib/filter/qa_filter.cc
+++ b/gnuradio-core/src/lib/filter/qa_filter.cc
@@ -42,13 +42,13 @@ qa_filter::suite ()
   CppUnit::TestSuite	*s = new CppUnit::TestSuite ("filter");
 
   s->addTest (qa_dotprod_suite ());
-  s->addTest (qa_gri_mmse_fir_interpolator::suite ());
-  s->addTest (qa_gri_mmse_fir_interpolator_cc::suite ());
   s->addTest (qa_gr_fir_fff::suite ());
   s->addTest (qa_gr_fir_ccc::suite ());
   s->addTest (qa_gr_fir_fcc::suite ());
   s->addTest (qa_gr_fir_scc::suite ());
   s->addTest (qa_gr_fir_ccf::suite ());
+  s->addTest (qa_gri_mmse_fir_interpolator::suite ());
+  s->addTest (qa_gri_mmse_fir_interpolator_cc::suite ());
 
   return s;
 }
diff --git a/gnuradio-core/src/lib/filter/qa_gr_fir_fff.cc b/gnuradio-core/src/lib/filter/qa_gr_fir_fff.cc
index 380435bf7c..b921223ed9 100644
--- a/gnuradio-core/src/lib/filter/qa_gr_fir_fff.cc
+++ b/gnuradio-core/src/lib/filter/qa_gr_fir_fff.cc
@@ -143,7 +143,7 @@ ref_dotprod (const i_type input[], const tap_type taps[], int ntaps)
 static void
 test_random_io (fir_maker_t maker)  
 {
-  const int	MAX_TAPS	= 9;
+  const int	MAX_TAPS	= 32;
   const int	OUTPUT_LEN	= 17;
   const int	INPUT_LEN	= MAX_TAPS + OUTPUT_LEN;
 
@@ -187,7 +187,7 @@ test_random_io (fir_maker_t maker)
       
       for (int o = 0; o < ol; o++){
 	CPPUNIT_ASSERT_DOUBLES_EQUAL (expected_output[o], actual_output[o],
-			    fabs (expected_output[o]) * 1e-4);
+				      fabs (expected_output[o]) * 9e-3);
       }
 
       delete f1;
diff --git a/gnuradio-core/src/lib/filter/sysconfig_powerpc.cc b/gnuradio-core/src/lib/filter/sysconfig_powerpc.cc
new file mode 100644
index 0000000000..e2b27815b4
--- /dev/null
+++ b/gnuradio-core/src/lib/filter/sysconfig_powerpc.cc
@@ -0,0 +1,38 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2002,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_fir_sysconfig_powerpc.h>
+
+gr_fir_sysconfig *
+gr_fir_sysconfig_singleton ()
+{
+  static gr_fir_sysconfig *singleton = 0;
+
+  if (singleton)
+    return singleton;
+
+  singleton = new gr_fir_sysconfig_powerpc ();
+  return singleton;
+}
diff --git a/gnuradio-core/src/lib/general/gr_math.h b/gnuradio-core/src/lib/general/gr_math.h
index 957a0e12aa..f1bd208b8c 100644
--- a/gnuradio-core/src/lib/general/gr_math.h
+++ b/gnuradio-core/src/lib/general/gr_math.h
@@ -174,4 +174,48 @@ static inline unsigned int gr_branchless_quad_45deg_slicer(gr_complex x)
   return gr_branchless_quad_45deg_slicer(x.real(), x.imag());
 }
 
+/*!
+ * \param x any value
+ * \param pow2 must be a power of 2
+ * \returns \p x rounded down to a multiple of \p pow2.
+ */
+static inline size_t
+gr_p2_round_down(size_t x, size_t pow2)
+{
+  return x & -pow2;
+}
+
+/*!
+ * \param x any value
+ * \param pow2 must be a power of 2
+ * \returns \p x rounded up to a multiple of \p pow2.
+ */
+static inline size_t
+gr_p2_round_up(size_t x, size_t pow2)
+{
+  return gr_p2_round_down(x + pow2 - 1, pow2);
+}
+
+/*!
+ * \param x any value
+ * \param pow2 must be a power of 2
+ * \returns \p x modulo \p pow2.
+ */
+static inline size_t
+gr_p2_modulo(size_t x, size_t pow2)
+{
+  return x & (pow2 - 1);
+}
+
+/*!
+ * \param x any value
+ * \param pow2 must be a power of 2
+ * \returns \p pow2 - (\p x modulo \p pow2).
+ */
+static inline size_t
+gr_p2_modulo_neg(size_t x, size_t pow2)
+{
+  return pow2 - gr_p2_modulo(x, pow2);
+}
+
 #endif /* _GR_MATH_H_ */
diff --git a/gnuradio-core/src/lib/general/gri_fft.cc b/gnuradio-core/src/lib/general/gri_fft.cc
index 17ea89e132..f6e28e1d1e 100644
--- a/gnuradio-core/src/lib/general/gri_fft.cc
+++ b/gnuradio-core/src/lib/general/gri_fft.cc
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2003 Free Software Foundation, Inc.
+ * Copyright 2003,2008 Free Software Foundation, Inc.
  * 
  * This file is part of GNU Radio
  * 
@@ -28,6 +28,11 @@
 #include <stdio.h>
 #include <cassert>
 #include <stdexcept>
+#include <boost/thread.hpp>
+
+typedef boost::mutex::scoped_lock scoped_lock;
+static boost::mutex  s_planning_mutex;
+
 
 static char *
 wisdom_filename ()
@@ -80,6 +85,9 @@ gri_fftw_export_wisdom ()
 
 gri_fft_complex::gri_fft_complex (int fft_size, bool forward)
 {
+  // Hold global mutex during plan construction and destruction.
+  scoped_lock	lock(s_planning_mutex);
+
   assert (sizeof (fftwf_complex) == sizeof (gr_complex));
   
   if (fft_size <= 0)
@@ -96,10 +104,6 @@ gri_fft_complex::gri_fft_complex (int fft_size, bool forward)
     throw std::runtime_error ("fftwf_malloc");
   }
 
-  // FIXME If there's ever a chance that the planning functions
-  // will be called in multiple threads, we've got to ensure single
-  // threaded access.  They are not thread-safe.
-  
   gri_fftw_import_wisdom ();	// load prior wisdom from disk
   d_plan = fftwf_plan_dft_1d (fft_size,
 			      reinterpret_cast<fftwf_complex *>(d_inbuf), 
@@ -116,6 +120,9 @@ gri_fft_complex::gri_fft_complex (int fft_size, bool forward)
 
 gri_fft_complex::~gri_fft_complex ()
 {
+  // Hold global mutex during plan construction and destruction.
+  scoped_lock	lock(s_planning_mutex);
+
   fftwf_destroy_plan ((fftwf_plan) d_plan);
   fftwf_free (d_inbuf);
   fftwf_free (d_outbuf);
@@ -131,6 +138,9 @@ gri_fft_complex::execute ()
 
 gri_fft_real_fwd::gri_fft_real_fwd (int fft_size)
 {
+  // Hold global mutex during plan construction and destruction.
+  scoped_lock	lock(s_planning_mutex);
+
   assert (sizeof (fftwf_complex) == sizeof (gr_complex));
   
   if (fft_size <= 0)
@@ -147,10 +157,6 @@ gri_fft_real_fwd::gri_fft_real_fwd (int fft_size)
     throw std::runtime_error ("fftwf_malloc");
   }
 
-  // FIXME If there's ever a chance that the planning functions
-  // will be called in multiple threads, we've got to ensure single
-  // threaded access.  They are not thread-safe.
-  
   gri_fftw_import_wisdom ();	// load prior wisdom from disk
   d_plan = fftwf_plan_dft_r2c_1d (fft_size,
 				  d_inbuf,
@@ -166,6 +172,9 @@ gri_fft_real_fwd::gri_fft_real_fwd (int fft_size)
 
 gri_fft_real_fwd::~gri_fft_real_fwd ()
 {
+  // Hold global mutex during plan construction and destruction.
+  scoped_lock	lock(s_planning_mutex);
+
   fftwf_destroy_plan ((fftwf_plan) d_plan);
   fftwf_free (d_inbuf);
   fftwf_free (d_outbuf);
diff --git a/gnuradio-core/src/lib/runtime/Makefile.am b/gnuradio-core/src/lib/runtime/Makefile.am
index 550031b944..b21b324128 100644
--- a/gnuradio-core/src/lib/runtime/Makefile.am
+++ b/gnuradio-core/src/lib/runtime/Makefile.am
@@ -21,7 +21,7 @@
 
 include $(top_srcdir)/Makefile.common
 
-AM_CPPFLAGS = $(STD_DEFINES_AND_INCLUDES) $(CPPUNIT_INCLUDES) $(WITH_INCLUDES)
+AM_CPPFLAGS = $(STD_DEFINES_AND_INCLUDES) $(CPPUNIT_INCLUDES) $(GRUEL_INCLUDES) $(WITH_INCLUDES)
 
 noinst_LTLIBRARIES = libruntime.la libruntime-qa.la
 
@@ -35,6 +35,7 @@ libruntime_la_SOURCES = 			\
 	gr_flat_flowgraph.cc			\
 	gr_block.cc				\
 	gr_block_detail.cc			\
+	gr_block_executor.cc			\
 	gr_hier_block2.cc			\
 	gr_hier_block2_detail.cc		\
 	gr_buffer.cc				\
@@ -48,16 +49,19 @@ libruntime_la_SOURCES = 			\
 	gr_pagesize.cc				\
 	gr_preferences.cc			\
 	gr_realtime.cc				\
-	gr_scheduler_thread.cc			\
+	gr_scheduler.cc				\
+	gr_scheduler_sts.cc			\
+	gr_scheduler_tpb.cc			\
 	gr_single_threaded_scheduler.cc		\
 	gr_sptr_magic.cc			\
 	gr_sync_block.cc			\
 	gr_sync_decimator.cc			\
 	gr_sync_interpolator.cc			\
+	gr_tmp_path.cc				\
 	gr_top_block.cc				\
 	gr_top_block_impl.cc			\
-	gr_top_block_impl_sts.cc		\
-	gr_tmp_path.cc				\
+	gr_tpb_detail.cc			\
+	gr_tpb_thread_body.cc			\
 	gr_vmcircbuf.cc				\
 	gr_vmcircbuf_mmap_shm_open.cc		\
 	gr_vmcircbuf_mmap_tmpfile.cc		\
@@ -82,6 +86,7 @@ grinclude_HEADERS = 				\
 	gr_flat_flowgraph.h			\
 	gr_block.h				\
 	gr_block_detail.h			\
+	gr_block_executor.h			\
 	gr_hier_block2.h			\
 	gr_hier_block2_detail.h			\
 	gr_buffer.h				\
@@ -97,7 +102,9 @@ grinclude_HEADERS = 				\
 	gr_preferences.h			\
 	gr_realtime.h				\
 	gr_runtime_types.h			\
-	gr_scheduler_thread.h			\
+	gr_scheduler.h				\
+	gr_scheduler_sts.h			\
+	gr_scheduler_tpb.h			\
 	gr_select_handler.h			\
 	gr_single_threaded_scheduler.h		\
 	gr_sptr_magic.h				\
@@ -106,7 +113,8 @@ grinclude_HEADERS = 				\
 	gr_sync_interpolator.h			\
 	gr_top_block.h				\
 	gr_top_block_impl.h			\
-	gr_top_block_impl_sts.h			\
+	gr_tpb_detail.h				\
+	gr_tpb_thread_body.h			\
 	gr_timer.h				\
 	gr_tmp_path.h				\
 	gr_types.h				\
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 0a8fb92c2d..7c2e9901b0 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -110,3 +110,11 @@ gr_block::fixed_rate_noutput_to_ninput(int noutput)
 {
   throw std::runtime_error("Unimplemented");
 }
+
+std::ostream&
+operator << (std::ostream& os, const gr_block *m)
+{
+  os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
+  return os;
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 79237ee83b..437b610b45 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -214,9 +214,13 @@ class gr_block : public gr_basic_block {
 typedef std::vector<gr_block_sptr> gr_block_vector_t;
 typedef std::vector<gr_block_sptr>::iterator gr_block_viter_t;
 
-inline gr_block_sptr make_gr_block_sptr(gr_basic_block_sptr p)
+inline gr_block_sptr cast_to_block_sptr(gr_basic_block_sptr p)
 {
   return boost::dynamic_pointer_cast<gr_block, gr_basic_block>(p);
 }
 
+
+std::ostream&
+operator << (std::ostream& os, const gr_block *m);
+
 #endif /* INCLUDED_GR_BLOCK_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_block_detail.h b/gnuradio-core/src/lib/runtime/gr_block_detail.h
index a3b7731c01..2856c402c7 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_block_detail.h
@@ -24,6 +24,7 @@
 #define INCLUDED_GR_BLOCK_DETAIL_H
 
 #include <gr_runtime_types.h>
+#include <gr_tpb_detail.h>
 #include <stdexcept>
 
 /*!
@@ -34,7 +35,6 @@
  * of almost all users of GNU Radio.  This decoupling also means that
  * we can make changes to the guts without having to recompile everything.
  */
-
 class gr_block_detail {
  public:
   ~gr_block_detail ();
@@ -73,8 +73,14 @@ class gr_block_detail {
    */
   void consume_each (int how_many_items);
 
+  /*!
+   * \brief Tell the scheduler \p how_many_items were produced on each output stream.
+   */
   void produce_each (int how_many_items);
 
+
+  gr_tpb_detail			     d_tpb;	// used by thread-per-block scheduler
+
   // ----------------------------------------------------------------------------
 
  private:
@@ -84,8 +90,11 @@ class gr_block_detail {
   std::vector<gr_buffer_sptr>	     d_output;
   bool                               d_done;
 
+
   gr_block_detail (unsigned int ninputs, unsigned int noutputs);
 
+  friend class gr_tpb_detail;
+
   friend gr_block_detail_sptr
   gr_make_block_detail (unsigned int ninputs, unsigned int noutputs);
 };
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
new file mode 100644
index 0000000000..fd3a916d4a
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -0,0 +1,329 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+#include <boost/thread.hpp>
+#include <iostream>
+#include <limits>
+#include <assert.h>
+#include <stdio.h>
+
+// must be defined to either 0 or 1
+#define ENABLE_LOGGING 0
+
+#if (ENABLE_LOGGING)
+#define LOG(x) do { x; } while(0)
+#else
+#define LOG(x) do {;} while(0)
+#endif
+
+static int which_scheduler  = 0;
+
+inline static unsigned int
+round_up (unsigned int n, unsigned int multiple)
+{
+  return ((n + multiple - 1) / multiple) * multiple;
+}
+
+inline static unsigned int
+round_down (unsigned int n, unsigned int multiple)
+{
+  return (n / multiple) * multiple;
+}
+
+//
+// Return minimum available write space in all our downstream buffers
+// or -1 if we're output blocked and the output we're blocked
+// on is done.
+//
+static int
+min_available_space (gr_block_detail *d, int output_multiple)
+{
+  int	min_space = std::numeric_limits<int>::max();
+
+  for (int i = 0; i < d->noutputs (); i++){
+    gr_buffer::scoped_lock guard(*d->output(i)->mutex());
+#if 0
+    int n = round_down(d->output(i)->space_available(), output_multiple);
+#else
+    int n = round_down(std::min(d->output(i)->space_available(),
+				d->output(i)->bufsize()/2),
+		       output_multiple);
+#endif
+    if (n == 0){			// We're blocked on output.
+      if (d->output(i)->done()){	// Downstream is done, therefore we're done.
+	return -1;
+      }
+      return 0;
+    }
+    min_space = std::min (min_space, n);
+  }
+  return min_space;
+}
+
+
+
+gr_block_executor::gr_block_executor (gr_block_sptr block)
+  : d_block(block), d_log(0)
+{
+  if (ENABLE_LOGGING){
+    char name[100];
+    snprintf(name, sizeof(name), "sst-%03d.log", which_scheduler++);
+    d_log = new std::ofstream(name);
+    std::unitbuf(*d_log);		// make it unbuffered...
+    *d_log << "gr_block_executor: "
+	   << d_block << std::endl;
+  }
+
+  d_block->start();			// enable any drivers, etc.
+}
+
+gr_block_executor::~gr_block_executor ()
+{
+  if (ENABLE_LOGGING)
+    delete d_log;
+
+  d_block->stop();			// stop any drivers, etc.
+}
+
+gr_block_executor::state
+gr_block_executor::run_one_iteration()
+{
+  int			noutput_items;
+  int			max_items_avail;
+
+  gr_block		*m = d_block.get();
+  gr_block_detail	*d = m->detail().get();
+
+  LOG(*d_log << std::endl << m);
+
+  if (d->done()){
+    assert(0);
+    return DONE;
+  }
+
+  if (d->source_p ()){
+    d_ninput_items_required.resize (0);
+    d_ninput_items.resize (0);
+    d_input_items.resize (0);
+    d_input_done.resize(0);
+    d_output_items.resize (d->noutputs ());
+
+    // determine the minimum available output space
+    noutput_items = min_available_space (d, m->output_multiple ());
+    LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
+    if (noutput_items == -1)		// we're done
+      goto were_done;
+
+    if (noutput_items == 0){		// we're output blocked
+      LOG(*d_log << "  BLKD_OUT\n");
+      return BLKD_OUT;
+    }
+
+    goto setup_call_to_work;		// jump to common code
+  }
+
+  else if (d->sink_p ()){
+    d_ninput_items_required.resize (d->ninputs ());
+    d_ninput_items.resize (d->ninputs ());
+    d_input_items.resize (d->ninputs ());
+    d_input_done.resize(d->ninputs());
+    d_output_items.resize (0);
+    LOG(*d_log << " sink\n");
+
+    max_items_avail = 0;
+    for (int i = 0; i < d->ninputs (); i++){
+      {
+	/*
+	 * Acquire the mutex and grab local copies of items_available and done.
+	 */
+	gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+	d_ninput_items[i] = d->input(i)->items_available();
+	d_input_done[i] = d->input(i)->done();
+      }
+
+      LOG(*d_log << "  d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
+      LOG(*d_log << "  d_input_done[" << i << "] = " << d_input_done[i] << std::endl);
+      
+      if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
+	goto were_done;
+	
+      max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+    }
+
+    // take a swag at how much output we can sink
+    noutput_items = (int) (max_items_avail * m->relative_rate ());
+    noutput_items = round_down (noutput_items, m->output_multiple ());
+    LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
+    LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);
+
+    if (noutput_items == 0){	// we're blocked on input
+      LOG(*d_log << "  BLKD_IN\n");
+      return BLKD_IN;
+    }
+
+    goto try_again;		// Jump to code shared with regular case.
+  }
+
+  else {
+    // do the regular thing
+    d_ninput_items_required.resize (d->ninputs ());
+    d_ninput_items.resize (d->ninputs ());
+    d_input_items.resize (d->ninputs ());
+    d_input_done.resize(d->ninputs());
+    d_output_items.resize (d->noutputs ());
+
+    max_items_avail = 0;
+    for (int i = 0; i < d->ninputs (); i++){
+      {
+	/*
+	 * Acquire the mutex and grab local copies of items_available and done.
+	 */
+	gr_buffer::scoped_lock guard(*d->input(i)->mutex());
+	d_ninput_items[i] = d->input(i)->items_available ();
+	d_input_done[i] = d->input(i)->done();
+      }
+      max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
+    }
+
+    // determine the minimum available output space
+    noutput_items = min_available_space (d, m->output_multiple ());
+    if (ENABLE_LOGGING){
+      *d_log << " regular ";
+      if (m->relative_rate() >= 1.0)
+	*d_log << "1:" << m->relative_rate() << std::endl;
+      else
+	*d_log << 1.0/m->relative_rate() << ":1\n";
+      *d_log << "  max_items_avail = " << max_items_avail << std::endl;
+      *d_log << "  noutput_items = " << noutput_items << std::endl;
+    }
+    if (noutput_items == -1)		// we're done
+      goto were_done;
+
+    if (noutput_items == 0){		// we're output blocked
+      LOG(*d_log << "  BLKD_OUT\n");
+      return BLKD_OUT;
+    }
+
+  try_again:
+    if (m->fixed_rate()){
+      // try to work it forward starting with max_items_avail.
+      // We want to try to consume all the input we've got.
+      int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
+      reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
+      if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+	noutput_items = reqd_noutput_items;
+    }
+
+    // ask the block how much input they need to produce noutput_items
+    m->forecast (noutput_items, d_ninput_items_required);
+
+    // See if we've got sufficient input available
+
+    int i;
+    for (i = 0; i < d->ninputs (); i++)
+      if (d_ninput_items_required[i] > d_ninput_items[i])	// not enough
+	break;
+
+    if (i < d->ninputs ()){			// not enough input on input[i]
+      // if we can, try reducing the size of our output request
+      if (noutput_items > m->output_multiple ()){
+	noutput_items /= 2;
+	noutput_items = round_up (noutput_items, m->output_multiple ());
+	goto try_again;
+      }
+
+      // We're blocked on input
+      LOG(*d_log << "  BLKD_IN\n");
+      if (d_input_done[i]) 	// If the upstream block is done, we're done
+	goto were_done;
+
+      // Is it possible to ever fulfill this request?
+      if (d_ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
+	// Nope, never going to happen...
+	std::cerr << "\nsched: <gr_block " << m->name()
+		  << " (" << m->unique_id() << ")>"
+		  << " is requesting more input data\n"
+		  << "  than we can provide.\n"
+		  << "  ninput_items_required = "
+		  << d_ninput_items_required[i] << "\n"
+		  << "  max_possible_items_available = "
+		  << d->input(i)->max_possible_items_available() << "\n"
+		  << "  If this is a filter, consider reducing the number of taps.\n";
+	goto were_done;
+      }
+
+      return BLKD_IN;
+    }
+
+    // We've got enough data on each input to produce noutput_items.
+    // Finish setting up the call to work.
+
+    for (int i = 0; i < d->ninputs (); i++)
+      d_input_items[i] = d->input(i)->read_pointer();
+
+  setup_call_to_work:
+
+    for (int i = 0; i < d->noutputs (); i++)
+      d_output_items[i] = d->output(i)->write_pointer();
+
+    // Do the actual work of the block
+    int n = m->general_work (noutput_items, d_ninput_items,
+			     d_input_items, d_output_items);
+    LOG(*d_log << "  general_work: noutput_items = " << noutput_items
+	<< " result = " << n << std::endl);
+
+    if (n == -1)		// block is done
+      goto were_done;
+
+    d->produce_each (n);	// advance write pointers
+    if (n > 0)
+      return READY;
+
+    // We didn't produce any output even though we called general_work.
+    // We have (most likely) consumed some input.
+
+    // If this is a source, it's broken.
+    if (d->source_p()){
+      std::cerr << "gr_block_executor: source " << m
+		<< " returned 0 from work.  We're marking it DONE.\n";
+      // FIXME maybe we ought to raise an exception...
+      goto were_done;
+    }
+
+    // Have the caller try again...
+    return READY_NO_OUTPUT;
+  }
+  assert (0);
+    
+ were_done:
+  LOG(*d_log << "  were_done\n");
+  d->set_done (true);
+  return DONE;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.h b/gnuradio-core/src/lib/runtime/gr_block_executor.h
new file mode 100644
index 0000000000..41b5ede7c8
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.h
@@ -0,0 +1,69 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_BLOCK_EXECUTOR_H
+#define INCLUDED_GR_BLOCK_EXECUTOR_H
+
+#include <gr_runtime_types.h>
+#include <fstream>
+
+//class gr_block_executor;
+//typedef boost::shared_ptr<gr_block_executor>	gr_block_executor_sptr;
+
+
+/*!
+ * \brief Manage the execution of a single block.
+ * \ingroup internal
+ */
+
+class gr_block_executor {
+protected:
+  gr_block_sptr			d_block;	// The block we're trying to run
+  std::ofstream	       	       *d_log;
+
+  // These are allocated here so we don't have to on each iteration
+
+  gr_vector_int			d_ninput_items_required;
+  gr_vector_int			d_ninput_items;
+  gr_vector_const_void_star	d_input_items;
+  std::vector<bool>		d_input_done;
+  gr_vector_void_star		d_output_items;
+
+ public:
+  gr_block_executor(gr_block_sptr block);
+  ~gr_block_executor ();
+
+  enum state {
+    READY,	      // We made progress; everything's cool.
+    READY_NO_OUTPUT,  // We consumed some input, but produced no output.
+    BLKD_IN,	      // no progress; we're blocked waiting for input data.
+    BLKD_OUT,	      // no progress; we're blocked waiting for output buffer space.
+    DONE,	      // we're done; don't call me again.
+  };
+
+  /*
+   * \brief Run one iteration.
+   */
+  state run_one_iteration();
+};
+
+#endif /* INCLUDED_GR_BLOCK_EXECUTOR_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.cc b/gnuradio-core/src/lib/runtime/gr_buffer.cc
index 77f0c7c43d..31a471ea75 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.cc
@@ -77,10 +77,10 @@ minimum_buffer_items (long type_size, long page_size)
 }
 
 
-gr_buffer::gr_buffer (int nitems, size_t sizeof_item)
+gr_buffer::gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link)
   : d_base (0), d_bufsize (0), d_vmcircbuf (0),
-    d_sizeof_item (sizeof_item), d_write_index (0),
-    d_done (false)
+    d_sizeof_item (sizeof_item), d_link(link),
+    d_write_index (0), d_done (false)
 {
   if (!allocate_buffer (nitems, sizeof_item))
     throw std::bad_alloc ();
@@ -89,9 +89,9 @@ gr_buffer::gr_buffer (int nitems, size_t sizeof_item)
 }
 
 gr_buffer_sptr 
-gr_make_buffer (int nitems, size_t sizeof_item)
+gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link)
 {
-  return gr_buffer_sptr (new gr_buffer (nitems, sizeof_item));
+  return gr_buffer_sptr (new gr_buffer (nitems, sizeof_item, link));
 }
 
 gr_buffer::~gr_buffer ()
@@ -146,7 +146,7 @@ gr_buffer::allocate_buffer (int nitems, size_t sizeof_item)
 
 
 int
-gr_buffer::space_available () const
+gr_buffer::space_available ()
 {
   if (d_readers.empty ())
     return d_bufsize - 1;	// See comment below
@@ -175,18 +175,27 @@ gr_buffer::write_pointer ()
 void
 gr_buffer::update_write_pointer (int nitems)
 {
+  scoped_lock	guard(*mutex());
   d_write_index = index_add (d_write_index, nitems);
 }
 
+void
+gr_buffer::set_done (bool done)
+{
+  scoped_lock	guard(*mutex());
+  d_done = done;
+}
+
 gr_buffer_reader_sptr
-gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload)
+gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link)
 {
   if (nzero_preload < 0)
     throw std::invalid_argument("gr_buffer_add_reader: nzero_preload must be >= 0");
 
   gr_buffer_reader_sptr r (new gr_buffer_reader (buf,
 						 buf->index_sub(buf->d_write_index,
-								nzero_preload)));
+								nzero_preload),
+						 link));
   buf->d_readers.push_back (r.get ());
 
   return r;
@@ -214,8 +223,9 @@ gr_buffer_ncurrently_allocated ()
 
 // ----------------------------------------------------------------------------
 
-gr_buffer_reader::gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index)
-  : d_buffer (buffer), d_read_index (read_index)
+gr_buffer_reader::gr_buffer_reader(gr_buffer_sptr buffer, unsigned int read_index,
+				   gr_block_sptr link)
+  : d_buffer(buffer), d_read_index(read_index), d_link(link)
 {
   s_buffer_reader_count++;
 }
@@ -241,6 +251,7 @@ gr_buffer_reader::read_pointer ()
 void
 gr_buffer_reader::update_read_pointer (int nitems)
 {
+  scoped_lock	guard(*mutex());
   d_read_index = d_buffer->index_add (d_read_index, nitems);
 }
 
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.h b/gnuradio-core/src/lib/runtime/gr_buffer.h
index cf578c89dd..75063cc6a1 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.h
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.h
@@ -24,6 +24,8 @@
 #define INCLUDED_GR_BUFFER_H
 
 #include <gr_runtime_types.h>
+#include <boost/weak_ptr.hpp>
+#include <boost/thread.hpp>
 
 class gr_vmcircbuf;
 
@@ -33,8 +35,12 @@ class gr_vmcircbuf;
  * The total size of the buffer will be rounded up to a system
  * dependent boundary.  This is typically the system page size, but
  * under MS windows is 64KB.
+ *
+ * \param nitems is the minimum number of items the buffer will hold.
+ * \param sizeof_item is the size of an item in bytes.
+ * \param link is the block that writes to this buffer.
  */
-gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
+gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link=gr_block_sptr());
 
 
 /*!
@@ -43,12 +49,20 @@ gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
  */
 class gr_buffer {
  public:
+
+  typedef boost::unique_lock<boost::mutex>  scoped_lock;
+
   virtual ~gr_buffer ();
 
   /*!
    * \brief return number of items worth of space available for writing
    */
-  int space_available () const;
+  int space_available ();
+
+  /*!
+   * \brief return size of this buffer in items
+   */
+  int bufsize() const { return d_bufsize; }
 
   /*!
    * \brief return pointer to write buffer.
@@ -63,17 +77,26 @@ class gr_buffer {
    */
   void update_write_pointer (int nitems);
 
-
-  void set_done (bool done)   { d_done = done; }
+  void set_done (bool done);
   bool done () const { return d_done; }
 
+  /*!
+   * \brief Return the block that writes to this buffer.
+   */
+  gr_block_sptr link() { return gr_block_sptr(d_link); }
+
+  size_t nreaders() const { return d_readers.size(); }
+  gr_buffer_reader* reader(size_t index) { return d_readers[index]; }
+
+  boost::mutex *mutex() { return &d_mutex; }
+
   // -------------------------------------------------------------------------
 
  private:
 
   friend class gr_buffer_reader;
-  friend gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
-  friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+  friend gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
+  friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
 
  protected:
   char				       *d_base;		// base address of buffer
@@ -81,8 +104,14 @@ class gr_buffer {
  private:
   gr_vmcircbuf			       *d_vmcircbuf;
   size_t	 			d_sizeof_item;	// in bytes
-  unsigned int				d_write_index;	// in items [0,d_bufsize)
   std::vector<gr_buffer_reader *>	d_readers;
+  boost::weak_ptr<gr_block>		d_link;		// block that writes to this buffer
+
+  //
+  // The mutex protects d_write_index, d_done and the d_read_index's in the buffer readers.
+  //
+  boost::mutex				d_mutex;
+  unsigned int				d_write_index;	// in items [0,d_bufsize)
   bool					d_done;
   
   unsigned
@@ -116,11 +145,15 @@ class gr_buffer {
    *
    * Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
    *
+   * \param nitems is the minimum number of items the buffer will hold.
+   * \param sizeof_item is the size of an item in bytes.
+   * \param link is the block that writes to this buffer.
+   *
    * The total size of the buffer will be rounded up to a system
    * dependent boundary.  This is typically the system page size, but
    * under MS windows is 64KB.
    */
-  gr_buffer (int nitems, size_t sizeof_item);
+  gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
 
   /*!
    * \brief disassociate \p reader from this buffer
@@ -132,8 +165,10 @@ class gr_buffer {
 /*!
  * \brief create a new gr_buffer_reader and attach it to buffer \p buf
  * \param nzero_preload -- number of zero items to "preload" into buffer.
+ * \param link is the block that reads from the buffer using this gr_buffer_reader.
  */
-gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+gr_buffer_reader_sptr 
+gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link=gr_block_sptr());
 
 //! returns # of gr_buffers currently allocated
 long gr_buffer_ncurrently_allocated ();
@@ -147,8 +182,10 @@ long gr_buffer_ncurrently_allocated ();
  */
 
 class gr_buffer_reader {
-
  public:
+
+  typedef gr_buffer::scoped_lock scoped_lock;
+
   ~gr_buffer_reader ();
 
   /*!
@@ -183,19 +220,29 @@ class gr_buffer_reader {
   void set_done (bool done)   { d_buffer->set_done (done); }
   bool done () const { return d_buffer->done (); }
 
+  boost::mutex *mutex() { return d_buffer->mutex(); }
+
+
+  /*!
+   * \brief Return the block that reads via this reader.
+   */
+  gr_block_sptr link() { return gr_block_sptr(d_link); }
+
   // -------------------------------------------------------------------------
 
  private:
 
   friend class gr_buffer;
-  friend gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+  friend gr_buffer_reader_sptr 
+  gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
 
 
   gr_buffer_sptr		d_buffer;
   unsigned int			d_read_index;	// in items [0,d->buffer.d_bufsize)
+  boost::weak_ptr<gr_block>	d_link;		// block that reads via this buffer reader
 
   //! constructor is private.  Use gr_buffer::add_reader to create instances
-  gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index);
+  gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link);
 };
 
 //! returns # of gr_buffer_readers currently allocated
diff --git a/gnuradio-core/src/lib/runtime/gr_buffer.i b/gnuradio-core/src/lib/runtime/gr_buffer.i
index 38e1d945da..4c1c5afae5 100644
--- a/gnuradio-core/src/lib/runtime/gr_buffer.i
+++ b/gnuradio-core/src/lib/runtime/gr_buffer.i
@@ -26,14 +26,14 @@ typedef boost::shared_ptr<gr_buffer> gr_buffer_sptr;
 %rename(buffer) gr_make_buffer;
 %ignore gr_buffer;
 
-gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item);
+gr_buffer_sptr gr_make_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
 
 class gr_buffer {
  public:
   ~gr_buffer ();
 
  private:
-  gr_buffer (int nitems, size_t sizeof_item);
+  gr_buffer (int nitems, size_t sizeof_item, gr_block_sptr link);
 };
   
 
@@ -43,7 +43,7 @@ typedef boost::shared_ptr<gr_buffer_reader> gr_buffer_reader_sptr;
 %ignore gr_buffer_reader;
 
 %rename(buffer_add_reader) gr_buffer_add_reader;
-gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload);
+gr_buffer_reader_sptr gr_buffer_add_reader (gr_buffer_sptr buf, int nzero_preload, gr_block_sptr link);
 
 class gr_buffer_reader {
  public:
@@ -51,7 +51,7 @@ class gr_buffer_reader {
 
  private:
   friend class gr_buffer;
-  gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index);
+  gr_buffer_reader (gr_buffer_sptr buffer, unsigned int read_index, gr_block_sptr link);
 };
 
 
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index aa1aa83532..031eb6dfd5 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -33,6 +33,11 @@
 
 #define GR_FLAT_FLOWGRAPH_DEBUG 0
 
+// 32Kbyte buffer size between blocks
+#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
+
+static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE;
+
 gr_flat_flowgraph_sptr
 gr_make_flat_flowgraph()
 {
@@ -54,7 +59,7 @@ gr_flat_flowgraph::setup_connections()
 
   // Assign block details to blocks
   for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
-    make_gr_block_sptr(*p)->set_detail(allocate_block_detail(*p));
+    cast_to_block_sptr(*p)->set_detail(allocate_block_detail(*p));
 
   // Connect inputs to outputs for each block
   for(gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
@@ -84,11 +89,15 @@ gr_flat_flowgraph::allocate_block_detail(gr_basic_block_sptr block)
 gr_buffer_sptr
 gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
 {
-  gr_block_sptr grblock = make_gr_block_sptr(block);
+  gr_block_sptr grblock = cast_to_block_sptr(block);
   if (!grblock)
     throw std::runtime_error("allocate_buffer found non-gr_block");
   int item_size = block->output_signature()->sizeof_stream_item(port);
-  int nitems = s_fixed_buffer_size/item_size;
+
+  // *2 because we're now only filling them 1/2 way in order to
+  // increase the available parallelism when using the TPB scheduler.
+  // (We're double buffering, where we used to single buffer)
+  int nitems = s_fixed_buffer_size * 2 / item_size;
 
   // Make sure there are at least twice the output_multiple no. of items
   if (nitems < 2*grblock->output_multiple())	// Note: this means output_multiple()
@@ -99,7 +108,7 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
   gr_basic_block_vector_t blocks = calc_downstream_blocks(block, port);
 
   for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
-    gr_block_sptr dgrblock = make_gr_block_sptr(*p);
+    gr_block_sptr dgrblock = cast_to_block_sptr(*p);
     if (!dgrblock)
       throw std::runtime_error("allocate_buffer found non-gr_block");
 
@@ -109,13 +118,13 @@ gr_flat_flowgraph::allocate_buffer(gr_basic_block_sptr block, int port)
     nitems = std::max(nitems, static_cast<int>(2*(decimation*multiple+history)));
   }
 
-  return gr_make_buffer(nitems, item_size);
+  return gr_make_buffer(nitems, item_size, grblock);
 }
 
 void
 gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
 {
-  gr_block_sptr grblock = make_gr_block_sptr(block);
+  gr_block_sptr grblock = cast_to_block_sptr(block);
   if (!grblock)
     throw std::runtime_error("connect_block_inputs found non-gr_block");
   
@@ -130,7 +139,7 @@ gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
     int dst_port = e->dst().port();
     int src_port = e->src().port();
     gr_basic_block_sptr src_block = e->src().block();
-    gr_block_sptr src_grblock = make_gr_block_sptr(src_block);
+    gr_block_sptr src_grblock = cast_to_block_sptr(src_block);
     if (!src_grblock)
       throw std::runtime_error("connect_block_inputs found non-gr_block");
     gr_buffer_sptr src_buffer = src_grblock->detail()->output(src_port);
@@ -138,7 +147,7 @@ gr_flat_flowgraph::connect_block_inputs(gr_basic_block_sptr block)
     if (GR_FLAT_FLOWGRAPH_DEBUG)
       std::cout << "Setting input " << dst_port << " from edge " << (*e) << std::endl;
 
-    detail->set_input(dst_port, gr_buffer_add_reader(src_buffer, grblock->history()-1));
+    detail->set_input(dst_port, gr_buffer_add_reader(src_buffer, grblock->history()-1, grblock));
   }
 }
 
@@ -149,7 +158,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
   // by flattening will need one; existing blocks still in the new flowgraph will
   // already have one.
   for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
-    gr_block_sptr block = make_gr_block_sptr(*p);
+    gr_block_sptr block = cast_to_block_sptr(*p);
     
     if (!block->detail()) {
       if (GR_FLAT_FLOWGRAPH_DEBUG)
@@ -177,7 +186,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
       if (GR_FLAT_FLOWGRAPH_DEBUG)
 	std::cout << "not in new edge list" << std::endl;
       // zero the buffer reader on RHS of old edge
-      gr_block_sptr block(make_gr_block_sptr(old_edge->dst().block()));
+      gr_block_sptr block(cast_to_block_sptr(old_edge->dst().block()));
       int port = old_edge->dst().port();
       block->detail()->set_input(port, gr_buffer_reader_sptr());
     }
@@ -189,7 +198,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
 
   // Now connect inputs to outputs, reusing old buffer readers if they exist
   for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
-    gr_block_sptr block = make_gr_block_sptr(*p);
+    gr_block_sptr block = cast_to_block_sptr(*p);
 
     if (GR_FLAT_FLOWGRAPH_DEBUG)
       std::cout << "merge: merging " << (*p) << "...";
@@ -208,7 +217,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
 	gr_edge edge = calc_upstream_edge(*p, i);
 
 	// Fish out old buffer reader and see if it matches correct buffer from edge list
-	gr_block_sptr src_block = make_gr_block_sptr(edge.src().block());
+	gr_block_sptr src_block = cast_to_block_sptr(edge.src().block());
 	gr_block_detail_sptr src_detail = src_block->detail();
 	gr_buffer_sptr src_buffer = src_detail->output(edge.src().port());
 	gr_buffer_reader_sptr old_reader;
@@ -225,7 +234,7 @@ gr_flat_flowgraph::merge_connections(gr_flat_flowgraph_sptr old_ffg)
 	    std::cout << "needs a new reader" << std::endl;
 
 	  // Create new buffer reader and assign
-	  detail->set_input(i, gr_buffer_add_reader(src_buffer, block->history()-1));
+	  detail->set_input(i, gr_buffer_add_reader(src_buffer, block->history()-1, block));
 	}
       }
     }
@@ -248,7 +257,7 @@ void gr_flat_flowgraph::dump()
 
   for (gr_basic_block_viter_t p = d_blocks.begin(); p != d_blocks.end(); p++) {
     std::cout << " block: " << (*p) << std::endl;
-    gr_block_detail_sptr detail = make_gr_block_sptr(*p)->detail();
+    gr_block_detail_sptr detail = cast_to_block_sptr(*p)->detail();
     std::cout << "  detail @" << detail << ":" << std::endl;
      
     int ni = detail->ninputs();
@@ -269,3 +278,14 @@ void gr_flat_flowgraph::dump()
   }
 
 }
+
+gr_block_vector_t
+gr_flat_flowgraph::make_block_vector(gr_basic_block_vector_t &blocks)
+{
+  gr_block_vector_t result;
+  for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
+    result.push_back(cast_to_block_sptr(*p));
+  }
+
+  return result;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
index 184ee45144..673c4df16f 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
@@ -26,9 +26,6 @@
 #include <gr_flowgraph.h>
 #include <gr_block.h>
 
-// 32Kbyte buffer size between blocks
-#define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
-
 // Create a shared pointer to a heap allocated gr_flat_flowgraph
 // (types defined in gr_runtime_types.h)
 gr_flat_flowgraph_sptr gr_make_flat_flowgraph();
@@ -55,10 +52,14 @@ public:
 
   void dump();
 
+  /*!
+   * Make a vector of gr_block from a vector of gr_basic_block
+   */
+  static gr_block_vector_t make_block_vector(gr_basic_block_vector_t &blocks);
+
 private:
   gr_flat_flowgraph();
 
-  static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE;
   gr_block_detail_sptr allocate_block_detail(gr_basic_block_sptr block);
   gr_buffer_sptr allocate_buffer(gr_basic_block_sptr block, int port);
   void connect_block_inputs(gr_basic_block_sptr block);
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index c97a50782c..fc407e72be 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -122,6 +122,9 @@ public:
   // Return vector of connected blocks
   gr_basic_block_vector_t calc_used_blocks();
 
+  // Return toplogically sorted vector of blocks.  All the sources come first.
+  gr_basic_block_vector_t topological_sort(gr_basic_block_vector_t &blocks);
+
   // Return vector of vectors of disjointly connected blocks, topologically
   // sorted.
   std::vector<gr_basic_block_vector_t> partition();
@@ -149,7 +152,6 @@ private:
   gr_basic_block_vector_t calc_reachable_blocks(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
   void reachable_dfs_visit(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
   gr_basic_block_vector_t calc_adjacent_blocks(gr_basic_block_sptr block, gr_basic_block_vector_t &blocks);
-  gr_basic_block_vector_t topological_sort(gr_basic_block_vector_t &blocks);
   gr_basic_block_vector_t sort_sources_first(gr_basic_block_vector_t &blocks);
   bool source_p(gr_basic_block_sptr block);
   void topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_vector_t &output);
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index 32cac2ea8c..a026851d20 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -303,7 +303,7 @@ gr_hier_block2_detail::resolve_endpoint(const gr_endpoint &endp, bool is_input)
   std::stringstream msg;
 
   // Check if endpoint is a leaf node
-  if (make_gr_block_sptr(endp.block()))
+  if (cast_to_block_sptr(endp.block()))
     return endp;
   
   // Check if endpoint is a hierarchical block
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_scheduler.cc
new file mode 100644
index 0000000000..e4d8b3dd9a
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler.cc
@@ -0,0 +1,33 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler.h>
+
+gr_scheduler::gr_scheduler(gr_flat_flowgraph_sptr ffg)
+{
+}
+
+gr_scheduler::~gr_scheduler()
+{
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler.h b/gnuradio-core/src/lib/runtime/gr_scheduler.h
new file mode 100644
index 0000000000..13bc1ff145
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler.h
@@ -0,0 +1,64 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifndef INCLUDED_GR_SCHEDULER_H
+#define INCLUDED_GR_SCHEDULER_H
+
+#include <boost/utility.hpp>
+#include <gr_block.h>
+#include <gr_flat_flowgraph.h>
+
+
+class gr_scheduler;
+typedef boost::shared_ptr<gr_scheduler> gr_scheduler_sptr;
+
+
+/*!
+ * \brief Abstract scheduler that takes a flattened flow graph and runs it.
+ *
+ * Preconditions: details, buffers and buffer readers have been assigned.
+ */
+class gr_scheduler : boost::noncopyable
+{
+
+public:
+  /*!
+   * \brief Construct a scheduler and begin evaluating the graph.
+   *
+   * The scheduler will continue running until all blocks until they
+   * report that they are done or the stop method is called.
+   */
+  gr_scheduler(gr_flat_flowgraph_sptr ffg);
+
+  virtual ~gr_scheduler();
+
+  /*!
+   * \brief Tell the scheduler to stop executing.
+   */
+  virtual void stop() = 0;
+
+  /*!
+   * \brief Block until the graph is done.
+   */
+  virtual void wait() = 0;
+};
+
+#endif /* INCLUDED_GR_SCHEDULER_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
new file mode 100644
index 0000000000..fefc0dc703
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.cc
@@ -0,0 +1,87 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler_sts.h>
+#include <gr_single_threaded_scheduler.h>
+#include <gruel/thread_body_wrapper.h>
+
+class sts_container
+{
+  gr_block_vector_t	d_blocks;
+  
+public:
+
+  sts_container(gr_block_vector_t blocks)
+    : d_blocks(blocks) {}
+
+  void operator()()
+  {
+    gr_make_single_threaded_scheduler(d_blocks)->run();
+  }
+};
+
+
+gr_scheduler_sptr
+gr_scheduler_sts::make(gr_flat_flowgraph_sptr ffg)
+{
+  return gr_scheduler_sptr(new gr_scheduler_sts(ffg));
+}
+
+gr_scheduler_sts::gr_scheduler_sts(gr_flat_flowgraph_sptr ffg)
+  : gr_scheduler(ffg)
+{
+  // Split the flattened flow graph into discrete partitions, each
+  // of which is topologically sorted.
+
+  std::vector<gr_basic_block_vector_t> graphs = ffg->partition();
+
+  // For each partition, create a thread to evaluate it using
+  // an instance of the gr_single_threaded_scheduler
+
+  for (std::vector<gr_basic_block_vector_t>::iterator p = graphs.begin();
+       p != graphs.end(); p++) {
+
+    gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(*p);
+    d_threads.create_thread(
+        gruel::thread_body_wrapper<sts_container>(sts_container(blocks),
+						  "single-threaded-scheduler"));
+  }
+}
+
+gr_scheduler_sts::~gr_scheduler_sts()
+{
+  stop();
+}
+
+void
+gr_scheduler_sts::stop()
+{
+  d_threads.interrupt_all();
+}
+
+void
+gr_scheduler_sts::wait()
+{
+  d_threads.join_all();
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
new file mode 100644
index 0000000000..4cf8351561
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_sts.h
@@ -0,0 +1,62 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_SCHEDULER_STS_H
+#define INCLUDED_GR_SCHEDULER_STS_H
+
+#include <gr_scheduler.h>
+#include <gruel/thread_group.h>
+
+/*!
+ * \brief Concrete scheduler that uses the single_threaded_scheduler
+ */
+class gr_scheduler_sts : public gr_scheduler
+{
+  gruel::thread_group		       d_threads;
+
+protected:
+  /*!
+   * \brief Construct a scheduler and begin evaluating the graph.
+   *
+   * The scheduler will continue running until all blocks until they
+   * report that they are done or the stop method is called.
+   */
+  gr_scheduler_sts(gr_flat_flowgraph_sptr ffg);
+
+public:
+  static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg);
+
+  ~gr_scheduler_sts();
+
+  /*!
+   * \brief Tell the scheduler to stop executing.
+   */
+  void stop();
+
+  /*!
+   * \brief Block until the graph is done.
+   */
+  void wait();
+};
+
+
+
+
+#endif /* INCLUDED_GR_SCHEDULER_STS_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc
deleted file mode 100644
index 07bd60500d..0000000000
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.cc
+++ /dev/null
@@ -1,110 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <gr_scheduler_thread.h>
-#include <iostream>
-#include <stdio.h>
-
-#ifdef HAVE_SIGNAL_H
-#include <signal.h>
-#endif
-
-#define GR_SCHEDULER_THREAD_DEBUG 0
-
-gr_scheduler_thread::gr_scheduler_thread(gr_block_vector_t graph) :
-  omni_thread(NULL, PRIORITY_NORMAL),
-  d_sts(gr_make_single_threaded_scheduler(graph))
-{
-}
-
-gr_scheduler_thread::~gr_scheduler_thread()
-{
-}
-
-void gr_scheduler_thread::start()
-{
-  if (GR_SCHEDULER_THREAD_DEBUG)
-    std::cout << "gr_scheduler_thread::start() "
-	      << this << std::endl;
-  start_undetached();
-}
-
-void *
-gr_scheduler_thread::run_undetached(void *arg)
-{
-  // This is the first code to run in the new thread context.
-
-  /*
-   * In general, on a *nix system, any thread of a process can receive
-   * any asynchronous signal.
-   *
-   * http://www.serpentine.com/blog/threads-faq/mixing-threads-and-signals-unix/
-   * http://www.linuxjournal.com/article/2121
-   * 
-   * We really don't want to be handling asynchronous signals such
-   * as SIGINT and SIGHUP here.  We mask them off in the signal
-   * processing threads so that they'll get handled by the mainline
-   * thread.  We leave the synchronous signals SIGQUIT, SIGBUS,
-   * SIGILL, SIGSEGV etc alone
-   *
-   * FIXME? It might be better to mask them all off in the parent
-   * thread then dedicate a single thread to handling all signals
-   * using sigwait.
-   */
-#if defined(HAVE_PTHREAD_SIGMASK) || defined(HAVE_SIGPROCMASK)
-  sigset_t old_set;
-  sigset_t new_set;
-  int r;
-  sigemptyset(&new_set);
-  sigaddset(&new_set, SIGINT);
-  sigaddset(&new_set, SIGHUP);
-  sigaddset(&new_set, SIGPIPE);
-  sigaddset(&new_set, SIGALRM);
-  sigaddset(&new_set, SIGCHLD);
-
-#ifdef HAVE_PTHREAD_SIGMASK
-  r = pthread_sigmask(SIG_BLOCK, &new_set, &old_set);
-  if (r != 0)
-    perror("pthread_sigmask");
-#else
-  r = sigprocmask(SIG_BLOCK, &new_set, &old_set);
-  if (r != 0)
-    perror("sigprocmask");
-#endif
-#endif
-  // Run the single-threaded scheduler
-  d_sts->run();
-  return 0;
-}
-
-void
-gr_scheduler_thread::stop()
-{
-  if (0 && GR_SCHEDULER_THREAD_DEBUG)		// FIXME not safe to call from signal handler
-    std::cout << "gr_scheduler_thread::stop() "
-	      << this << std::endl;
-  d_sts->stop();
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h b/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h
deleted file mode 100644
index 89daba4031..0000000000
--- a/gnuradio-core/src/lib/runtime/gr_scheduler_thread.h
+++ /dev/null
@@ -1,59 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- * 
- * This file is part of GNU Radio
- * 
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- * 
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef INCLUDED_GR_SCHEDULER_THREAD_H
-#define INCLUDED_GR_SCHEDULER_THREAD_H
-
-#include <omnithread.h>
-#include <gr_single_threaded_scheduler.h>
-#include <gr_block.h>
-
-// omnithread calls delete on itself after thread exits, so can't use shared ptr
-class gr_scheduler_thread;
-typedef std::vector<gr_scheduler_thread *> gr_scheduler_thread_vector_t;
-typedef gr_scheduler_thread_vector_t::iterator gr_scheduler_thread_viter_t;
-
-/*!
- *\brief A single thread of execution for the scheduler
- *
- * \ingroup internal
- * This class implements a single thread that runs undetached, and
- * invokes the single-threaded block scheduler.  The runtime makes
- * one of these for each distinct partition of a flowgraph and runs
- * them in parallel.
- *
- */
-class gr_scheduler_thread : public omni_thread
-{
-private:
-  gr_single_threaded_scheduler_sptr d_sts;    
-
-public:
-  gr_scheduler_thread(gr_block_vector_t graph);
-  ~gr_scheduler_thread();
-
-  virtual void *run_undetached(void *arg);
-  void start();
-  void stop();
-};
-
-#endif /* INCLUDED_GR_SCHEDULER_THREAD_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
new file mode 100644
index 0000000000..af03385705
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.cc
@@ -0,0 +1,95 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_scheduler_tpb.h>
+#include <gr_tpb_thread_body.h>
+#include <gruel/thread_body_wrapper.h>
+#include <sstream>
+
+/*
+ * You know, a lambda expression would be sooo much easier...
+ */
+class tpb_container
+{
+  gr_block_sptr	d_block;
+  
+public:
+  tpb_container(gr_block_sptr block) : d_block(block) {}
+
+  void operator()()
+  {
+    gr_tpb_thread_body	body(d_block);
+  }
+};
+
+
+gr_scheduler_sptr
+gr_scheduler_tpb::make(gr_flat_flowgraph_sptr ffg)
+{
+  return gr_scheduler_sptr(new gr_scheduler_tpb(ffg));
+}
+
+gr_scheduler_tpb::gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg)
+  : gr_scheduler(ffg)
+{
+  // Get a topologically sorted vector of all the blocks in use.
+  // Being topologically sorted probably isn't going to matter, but
+  // there's a non-zero chance it might help...
+
+  gr_basic_block_vector_t used_blocks = ffg->calc_used_blocks();
+  used_blocks = ffg->topological_sort(used_blocks);
+  gr_block_vector_t blocks = gr_flat_flowgraph::make_block_vector(used_blocks);
+
+  // Ensure that the done flag is clear on all blocks
+
+  for (size_t i = 0; i < blocks.size(); i++){
+    blocks[i]->detail()->set_done(false);
+  }
+
+  // Fire off a thead for each block
+
+  for (size_t i = 0; i < blocks.size(); i++){
+    std::stringstream name;
+    name << "thread-per-block[" << i << "]: " << blocks[i];
+    d_threads.create_thread(
+      gruel::thread_body_wrapper<tpb_container>(tpb_container(blocks[i]), name.str()));
+  }
+}
+
+gr_scheduler_tpb::~gr_scheduler_tpb()
+{
+  stop();
+}
+
+void
+gr_scheduler_tpb::stop()
+{
+  d_threads.interrupt_all();
+}
+
+void
+gr_scheduler_tpb::wait()
+{
+  d_threads.join_all();
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
new file mode 100644
index 0000000000..16a0c0204e
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_scheduler_tpb.h
@@ -0,0 +1,60 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_SCHEDULER_TPB_H
+#define INCLUDED_GR_SCHEDULER_TPB_H
+
+#include <gr_scheduler.h>
+#include <gruel/thread_group.h>
+
+/*!
+ * \brief Concrete scheduler that uses a kernel thread-per-block
+ */
+class gr_scheduler_tpb : public gr_scheduler
+{
+  gruel::thread_group		       d_threads;
+
+protected:
+  /*!
+   * \brief Construct a scheduler and begin evaluating the graph.
+   *
+   * The scheduler will continue running until all blocks until they
+   * report that they are done or the stop method is called.
+   */
+  gr_scheduler_tpb(gr_flat_flowgraph_sptr ffg);
+
+public:
+  static gr_scheduler_sptr make(gr_flat_flowgraph_sptr ffg);
+
+  ~gr_scheduler_tpb();
+
+  /*!
+   * \brief Tell the scheduler to stop executing.
+   */
+  void stop();
+
+  /*!
+   * \brief Block until the graph is done.
+   */
+  void wait();
+};
+
+
+#endif /* INCLUDED_GR_SCHEDULER_TPB_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc b/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
index b2fbdb73be..7f1b40641e 100644
--- a/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
+++ b/gnuradio-core/src/lib/runtime/gr_single_threaded_scheduler.cc
@@ -28,6 +28,7 @@
 #include <gr_block.h>
 #include <gr_block_detail.h>
 #include <gr_buffer.h>
+#include <boost/thread.hpp>
 #include <iostream>
 #include <limits>
 #include <assert.h>
@@ -44,14 +45,6 @@
 
 static int which_scheduler  = 0;
 
-
-std::ostream&
-operator << (std::ostream& os, const gr_block *m)
-{
-  os << "<gr_block " << m->name() << " (" << m->unique_id() << ")>";
-  return os;
-}
-
 gr_single_threaded_scheduler_sptr
 gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks)
 {
@@ -162,6 +155,9 @@ gr_single_threaded_scheduler::main_loop ()
   nalive = d_blocks.size ();
   while (d_enabled && nalive > 0){
 
+    if (boost::this_thread::interruption_requested())
+      break;
+
     gr_block		*m = d_blocks[bi].get ();
     gr_block_detail	*d = m->detail().get ();
 
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block.cc b/gnuradio-core/src/lib/runtime/gr_top_block.cc
index 3c8e28f701..09e46dfbb4 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_top_block.cc
@@ -27,7 +27,6 @@
 #include <unistd.h>
 #include <gr_top_block.h>
 #include <gr_top_block_impl.h>
-#include <gr_top_block_impl_sts.h>
 #include <gr_io_signature.h>
 #include <iostream>
 
@@ -43,7 +42,7 @@ gr_top_block::gr_top_block(const std::string &name)
 		   gr_make_io_signature(0,0,0))
   
 {
-  d_impl = new gr_top_block_impl_sts(this);
+  d_impl = new gr_top_block_impl(this);
 }
   
 gr_top_block::~gr_top_block()
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
index 5914379384..50d480d009 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
+++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.cc
@@ -27,21 +27,58 @@
 #include <gr_top_block.h>
 #include <gr_top_block_impl.h>
 #include <gr_flat_flowgraph.h>
-#include <gr_scheduler_thread.h>
-#include <gr_local_sighandler.h>
+#include <gr_scheduler_sts.h>
+#include <gr_scheduler_tpb.h>
 
 #include <stdexcept>
 #include <iostream>
 #include <string.h>
 #include <unistd.h>
+#include <stdlib.h>
 
 #define GR_TOP_BLOCK_IMPL_DEBUG 0
 
+
+typedef gr_scheduler_sptr (*scheduler_maker)(gr_flat_flowgraph_sptr ffg);
+
+static struct scheduler_table {
+  const char 	       *name;
+  scheduler_maker	f;
+} scheduler_table[] = {
+  { "TPB",	gr_scheduler_tpb::make },	// first entry is default
+  { "STS",	gr_scheduler_sts::make }
+};
+
+static gr_scheduler_sptr
+make_scheduler(gr_flat_flowgraph_sptr ffg)
+{
+  static scheduler_maker  factory = 0;
+
+  if (factory == 0){
+    char *v = getenv("GR_SCHEDULER");
+    if (!v)
+      factory = scheduler_table[0].f;	// use default
+    else {
+      for (size_t i = 0; i < sizeof(scheduler_table)/sizeof(scheduler_table[0]); i++){
+	if (strcmp(v, scheduler_table[i].name) == 0){
+	  factory = scheduler_table[i].f;
+	  break;
+	}
+      }
+      if (factory == 0){
+	std::cerr << "warning: Invalid GR_SCHEDULER environment variable value \""
+		  << v << "\".  Using \"" << scheduler_table[0].name << "\"\n";
+	factory = scheduler_table[0].f;
+      }
+    }
+  }
+  return factory(ffg);
+}
+
+
 gr_top_block_impl::gr_top_block_impl(gr_top_block *owner) 
-  : d_owner(owner),
-    d_running(false),
-    d_ffg(),
-    d_lock_count(0)
+  : d_owner(owner), d_ffg(),
+    d_state(IDLE), d_lock_count(0)
 {
 }
 
@@ -53,14 +90,13 @@ gr_top_block_impl::~gr_top_block_impl()
 void
 gr_top_block_impl::start()
 {
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "start: entered " << this << std::endl;
+  gr_lock_guard	l(d_mutex);
 
-  if (d_running)
+  if (d_state != IDLE)
     throw std::runtime_error("top_block::start: top block already running or wait() not called after previous stop()");
 
   if (d_lock_count > 0)
-    throw std::runtime_error("top_block::start: can't call start with flow graph locked");
+    throw std::runtime_error("top_block::start: can't start with flow graph locked");
 
   // Create new flat flow graph by flattening hierarchy
   d_ffg = d_owner->flatten();
@@ -69,77 +105,71 @@ gr_top_block_impl::start()
   d_ffg->validate();
   d_ffg->setup_connections();
 
-  // Execute scheduler threads
-  start_threads();
-  d_running = true;
+  d_scheduler = make_scheduler(d_ffg);
+  d_state = RUNNING;
 }
 
+void 
+gr_top_block_impl::stop()
+{
+  if (d_scheduler)
+    d_scheduler->stop();
+}
+
+
+void
+gr_top_block_impl::wait()
+{
+  if (d_scheduler)
+    d_scheduler->wait();
+
+  d_state = IDLE;
+}
 
 // N.B. lock() and unlock() cannot be called from a flow graph thread or
 // deadlock will occur when reconfiguration happens
 void
 gr_top_block_impl::lock()
 {
-  omni_mutex_lock lock(d_reconf);
+  gr_lock_guard lock(d_mutex);
   d_lock_count++;
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "runtime: locked, count = " << d_lock_count <<  std::endl;
 }
 
 void
 gr_top_block_impl::unlock()
 {
-  omni_mutex_lock lock(d_reconf);
+  gr_lock_guard lock(d_mutex);
+
   if (d_lock_count <= 0){
     d_lock_count = 0;		// fix it, then complain
     throw std::runtime_error("unpaired unlock() call");
   }
 
   d_lock_count--;
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "unlock: unlocked, count = " << d_lock_count << std::endl;
+  if (d_lock_count > 0 || d_state == IDLE) // nothing to do
+    return;
 
-  if (d_lock_count == 0) {
-    if (GR_TOP_BLOCK_IMPL_DEBUG)
-      std::cout << "unlock: restarting flowgraph" << std::endl;
-    restart();
-  }
+  restart();
 }
 
+/*
+ * restart is called with d_mutex held
+ */
 void
 gr_top_block_impl::restart()
 {
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "restart: entered" << std::endl;
-
-  if (!d_running)
-    return;		// nothing to do
-
-  // Stop scheduler threads and wait for completion
-  stop();
+  stop();		     // Stop scheduler and wait for completion
   wait();
-  if (GR_TOP_BLOCK_IMPL_DEBUG)
-    std::cout << "restart: threads stopped" << std::endl;
 
   // Create new simple flow graph
   gr_flat_flowgraph_sptr new_ffg = d_owner->flatten();        
   new_ffg->validate();		       // check consistency, sanity, etc
-
-  if (GR_TOP_BLOCK_IMPL_DEBUG) {
-      std::cout << std::endl << "*** Existing flat flowgraph @" << d_ffg << ":" << std::endl;
-      d_ffg->dump();
-  }
   new_ffg->merge_connections(d_ffg);   // reuse buffers, etc
-
-  if (GR_TOP_BLOCK_IMPL_DEBUG) {
-    std::cout << std::endl << "*** New flat flowgraph after merge @" << new_ffg << ":" << std::endl;
-    new_ffg->dump();
-  }
-  
   d_ffg = new_ffg;
 
-  start_threads();
-  d_running = true;
+  // Create a new scheduler to execute it
+  d_scheduler = make_scheduler(d_ffg);
+  d_state = RUNNING;
 }
 
 void
@@ -148,14 +178,3 @@ gr_top_block_impl::dump()
   if (d_ffg)
     d_ffg->dump();
 }
-
-gr_block_vector_t
-gr_top_block_impl::make_gr_block_vector(gr_basic_block_vector_t blocks)
-{
-  gr_block_vector_t result;
-  for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
-    result.push_back(make_gr_block_sptr(*p));
-  }
-
-  return result;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
index 869f788ef4..35fb44ef92 100644
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
+++ b/gnuradio-core/src/lib/runtime/gr_top_block_impl.h
@@ -23,7 +23,11 @@
 #ifndef INCLUDED_GR_TOP_BLOCK_IMPL_H
 #define INCLUDED_GR_TOP_BLOCK_IMPL_H
 
-#include <gr_scheduler_thread.h>
+#include <gr_scheduler.h>
+#include <boost/thread.hpp>
+
+typedef boost::mutex			gr_mutex; 	// FIXME move these elsewhere
+typedef boost::lock_guard<boost::mutex>	gr_lock_guard;
 
 /*!
  *\brief Abstract implementation details of gr_top_block
@@ -37,16 +41,16 @@ class gr_top_block_impl
 {
 public:
   gr_top_block_impl(gr_top_block *owner);
-  virtual ~gr_top_block_impl();
+  ~gr_top_block_impl();
 
   // Create and start scheduler threads
-  virtual void start();
+  void start();
 
   // Signal scheduler threads to stop
-  virtual void stop() = 0;
+  void stop();
 
   // Wait for scheduler threads to exit
-  virtual void wait() = 0;
+  void wait();
 
   // Lock the top block to allow reconfiguration
   void lock();
@@ -59,22 +63,16 @@ public:
   
 protected:
     
+  enum tb_state { IDLE, RUNNING };
+
   gr_top_block                  *d_owner;
-  bool                           d_running;
   gr_flat_flowgraph_sptr         d_ffg;
+  gr_scheduler_sptr		 d_scheduler;
 
-  omni_mutex                     d_reconf;	// protects d_lock_count
+  gr_mutex                       d_mutex;	// protects d_state and d_lock_count
+  tb_state			 d_state;
   int                            d_lock_count;
-
-  virtual void start_threads() = 0;
-
-/*!
- * Make a vector of gr_block from a vector of gr_basic_block
- *
- * Pass-by-value to avoid problem with possible asynchronous modification
- */
-  static gr_block_vector_t make_gr_block_vector(gr_basic_block_vector_t blocks);
-
+  
 private:
   void restart();
 };
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc b/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc
deleted file mode 100644
index b3e9da6275..0000000000
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.cc
+++ /dev/null
@@ -1,128 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007,2008 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include <gr_top_block.h>
-#include <gr_top_block_impl_sts.h>
-#include <gr_flat_flowgraph.h>
-#include <gr_scheduler_thread.h>
-#include <gr_local_sighandler.h>
-
-#include <stdexcept>
-#include <iostream>
-#include <string.h>
-#include <unistd.h>
-
-#define GR_TOP_BLOCK_IMPL_STS_DEBUG 0
-
-static gr_top_block_impl *s_impl = 0;
-
-
-// FIXME: This prevents using more than one gr_top_block instance
-
-static void 
-runtime_sigint_handler(int signum)
-{
-  if (GR_TOP_BLOCK_IMPL_STS_DEBUG){
-    char *msg = "SIGINT received, calling stop()\n";
-    ::write(1, msg, strlen(msg));	// write is OK to call from signal handler
-  }
-
-  if (s_impl)
-    s_impl->stop();
-}
-
-// ----------------------------------------------------------------
-
-gr_top_block_impl_sts::gr_top_block_impl_sts(gr_top_block *owner) 
-  : gr_top_block_impl(owner)
-{
-  if (s_impl)
-    throw std::logic_error("gr_top_block_impl_sts: multiple simultaneous gr_top_blocks not allowed");
-
-  s_impl = this;
-}
-
-gr_top_block_impl_sts::~gr_top_block_impl_sts()
-{
-  s_impl = 0; // don't call delete we don't own these
-}
-
-void
-gr_top_block_impl_sts::start_threads()
-{
-  if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-    std::cout << "start_threads: entered" << std::endl;
-
-  d_graphs = d_ffg->partition();
-  for (std::vector<gr_basic_block_vector_t>::iterator p = d_graphs.begin();
-       p != d_graphs.end(); p++) {
-    gr_scheduler_thread *thread = new gr_scheduler_thread(make_gr_block_vector(*p));
-    d_threads.push_back(thread);
-    if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-      std::cout << "start_threads: starting " << thread << std::endl;
-    thread->start();
-  }
-}
-
-/*
- * N.B. as currently implemented, it is possible that this may be
- * invoked by the SIGINT handler which is fragile as hell...
- */
-void
-gr_top_block_impl_sts::stop()
-{
-  if (GR_TOP_BLOCK_IMPL_STS_DEBUG){
-    char *msg = "stop: entered\n";
-    ::write(1, msg, strlen(msg));
-  }
-
-  for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
-    if (*p)
-      (*p)->stop();
-  }
-}
-
-void
-gr_top_block_impl_sts::wait()
-{
-  if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-    std::cout << "wait: entered" << std::endl;
-
-  void *dummy_status; // don't ever dereference this
-  gr_local_sighandler sigint(SIGINT, runtime_sigint_handler);
-
-  for (gr_scheduler_thread_viter_t p = d_threads.begin(); p != d_threads.end(); p++) {
-    if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-      std::cout << "wait: joining thread " << (*p) << std::endl;
-    (*p)->join(&dummy_status); // omnithreads will self-delete, so pointer is now dead
-    (*p) = 0; // FIXME: switch to stl::list and actually remove from container
-    if (GR_TOP_BLOCK_IMPL_STS_DEBUG)
-      std::cout << "wait: join returned" << std::endl;
-  }
-
-  d_threads.clear();
-  d_running = false;
-}
diff --git a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h b/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h
deleted file mode 100644
index ec2e51cf25..0000000000
--- a/gnuradio-core/src/lib/runtime/gr_top_block_impl_sts.h
+++ /dev/null
@@ -1,55 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2007 Free Software Foundation, Inc.
- * 
- * This file is part of GNU Radio
- * 
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- * 
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- * 
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING.  If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifndef INCLUDED_GR_TOP_BLOCK_IMPL_STS_H
-#define INCLUDED_GR_TOP_BLOCK_IMPL_STS_H
-
-#include <gr_top_block_impl.h>
-#include <gr_scheduler_thread.h>
-
-/*!
- *\brief Implementation details of gr_top_block
- * \ingroup internal
- *
- * Concrete implementation of gr_top_block using gr_single_threaded_scheduler.
- */
-class gr_top_block_impl_sts : public gr_top_block_impl
-{
-public:
-  gr_top_block_impl_sts(gr_top_block *owner);
-  ~gr_top_block_impl_sts();
-
-  // Signal scheduler threads to stop
-  void stop();
-
-  // Wait for scheduler threads to exit
-  void wait();
-
-private:
-    
-  gr_scheduler_thread_vector_t   d_threads;
-  std::vector<gr_basic_block_vector_t> d_graphs;
-
-  void start_threads();
-};
-
-#endif /* INCLUDED_GR_TOP_BLOCK_IMPL_STS_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
new file mode 100644
index 0000000000..02e8deed88
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.cc
@@ -0,0 +1,67 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_tpb_detail.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+#include <gr_buffer.h>
+
+/*
+ * We assume that no worker threads are ever running when the
+ * graph structure is being manipulated, thus it's safe for us to poke
+ * around in our neighbors w/o holding any locks.
+ */
+
+void
+gr_tpb_detail::notify_upstream(gr_block_detail *d)
+{
+  // For each of our inputs, tell the guy upstream that we've consumed
+  // some input, and that he most likely has more output buffer space
+  // available.
+
+  for (size_t i = 0; i < d->d_input.size(); i++){
+    // Can you say, "pointer chasing?"
+    d->d_input[i]->buffer()->link()->detail()->d_tpb.set_output_changed();
+  }
+}
+
+void
+gr_tpb_detail::notify_downstream(gr_block_detail *d)
+{
+  // For each of our outputs, tell the guys downstream that they have
+  // new input available.
+
+  for (size_t i = 0; i < d->d_output.size(); i++){
+    gr_buffer_sptr buf = d->d_output[i];
+    for (size_t j = 0, k = buf->nreaders(); j < k; j++)
+      buf->reader(j)->link()->detail()->d_tpb.set_input_changed();
+  }
+}
+
+void
+gr_tpb_detail::notify_neighbors(gr_block_detail *d)
+{
+  notify_downstream(d);
+  notify_upstream(d);
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_detail.h b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
new file mode 100644
index 0000000000..9566312dc8
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_detail.h
@@ -0,0 +1,81 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_TPB_DETAIL_H
+#define INCLUDED_GR_TPB_DETAIL_H
+
+#include <boost/thread.hpp>
+
+class gr_block_detail;
+
+/*!
+ * \brief used by thread-per-block scheduler
+ */
+struct gr_tpb_detail {
+  typedef boost::unique_lock<boost::mutex>  scoped_lock;
+
+  boost::mutex			mutex;			//< protects all vars
+  bool				input_changed;
+  boost::condition_variable	input_cond;
+  bool				output_changed;
+  boost::condition_variable	output_cond;
+
+  gr_tpb_detail()
+    : input_changed(false), output_changed(false) {}
+
+
+  //! Called by us to tell all our upstream blocks that their output may have changed.
+  void notify_upstream(gr_block_detail *d);
+
+  //! Called by us to tell all our downstream blocks that their input may have changed.
+  void notify_downstream(gr_block_detail *d);
+
+  //! Called by us to notify both upstream and downstream
+  void notify_neighbors(gr_block_detail *d);
+
+  //! Called by us
+  void clear_changed()
+  {
+    scoped_lock	guard(mutex);
+    input_changed = false;
+    output_changed = false;
+  }
+
+private:
+
+  //! Used by notify_downstream
+  void set_input_changed()
+  {
+    scoped_lock	guard(mutex);
+    input_changed = true;
+    input_cond.notify_one();
+  }
+
+  //! Used by notify_upstream
+  void set_output_changed()
+  {
+    scoped_lock	guard(mutex);
+    output_changed = true;
+    output_cond.notify_one();
+  }
+
+};
+
+#endif /* INCLUDED_GR_TPB_DETAIL_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
new file mode 100644
index 0000000000..f61e172436
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -0,0 +1,76 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+#include <gr_tpb_thread_body.h>
+#include <iostream>
+
+gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block)
+  : d_exec(block)
+{
+  // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
+
+  gr_block_detail	*d = block->detail().get();
+  gr_block_executor::state s;
+
+  while (1){
+    d->d_tpb.clear_changed();
+    s = d_exec.run_one_iteration();
+
+    switch(s){
+    case gr_block_executor::READY:		// Tell neighbors we made progress.
+      d->d_tpb.notify_neighbors(d);
+      break;
+
+    case gr_block_executor::READY_NO_OUTPUT:	// Notify upstream only
+      d->d_tpb.notify_upstream(d);
+      break;
+
+    case gr_block_executor::DONE:		// Game over.
+      d->d_tpb.notify_neighbors(d);
+      return;
+
+    case gr_block_executor::BLKD_IN:		// Wait for input.
+      {
+	gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
+	while(!d->d_tpb.input_changed)
+	  d->d_tpb.input_cond.wait(guard);
+      }
+      break;
+      
+    case gr_block_executor::BLKD_OUT:		// Wait for output buffer space.
+      {
+	gr_tpb_detail::scoped_lock guard(d->d_tpb.mutex);
+	while(!d->d_tpb.output_changed)
+	  d->d_tpb.output_cond.wait(guard);
+      }
+      break;
+
+    default:
+      assert(0);
+    }
+  }
+}
+
+gr_tpb_thread_body::~gr_tpb_thread_body()
+{
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
new file mode 100644
index 0000000000..a630b1be9f
--- /dev/null
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.h
@@ -0,0 +1,45 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2008 Free Software Foundation, Inc.
+ * 
+ * This file is part of GNU Radio
+ * 
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ * 
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+ */
+#ifndef INCLUDED_GR_TPB_THREAD_BODY_H
+#define INCLUDED_GR_TPB_THREAD_BODY_H
+
+#include <gr_block_executor.h>
+#include <gr_block.h>
+#include <gr_block_detail.h>
+
+/*!
+ * \brief The body of each thread-per-block thread.
+ *
+ * One of these is instantiated in its own thread for each block.  The
+ * constructor turns into the main loop which returns when the block is
+ * done or is interrupted.
+ */
+
+class gr_tpb_thread_body {
+  gr_block_executor	d_exec;
+
+public:
+  gr_tpb_thread_body(gr_block_sptr block);
+  ~gr_tpb_thread_body();
+};
+
+
+#endif /* INCLUDED_GR_TPB_THREAD_BODY_H */
diff --git a/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc b/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
index ad40f724d0..7434cf657f 100644
--- a/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
+++ b/gnuradio-core/src/lib/runtime/qa_gr_buffer.cc
@@ -52,7 +52,7 @@ t0_body ()
   int	nitems = 4000 / sizeof (int);
   int	counter = 0;
 
-  gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
+  gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
 
   int last_sa;
   int sa;
@@ -87,8 +87,8 @@ t1_body ()
   int	write_counter = 0;
   int	read_counter = 0;
 
-  gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
-  gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0));
+  gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
+  gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0, gr_block_sptr()));
   
 
   int sa;
@@ -162,8 +162,8 @@ t2_body ()
   
   int	nitems = (64 * (1L << 10)) / sizeof (int);	// 64K worth of ints
 
-  gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
-  gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0));
+  gr_buffer_sptr buf(gr_make_buffer (nitems, sizeof (int), gr_block_sptr()));
+  gr_buffer_reader_sptr r1 (gr_buffer_add_reader (buf, 0, gr_block_sptr()));
 
   int	read_counter = 0;
   int	write_counter = 0;
@@ -229,7 +229,7 @@ t3_body ()
   int	nitems = (64 * (1L << 10)) / sizeof (int);
 
   static const int N = 5;
-  gr_buffer_sptr buf (gr_make_buffer (nitems, sizeof (int)));
+  gr_buffer_sptr buf(gr_make_buffer(nitems, sizeof (int), gr_block_sptr()));
   gr_buffer_reader_sptr 	reader[N];
   int			read_counter[N];
   int			write_counter = 0;
@@ -237,7 +237,7 @@ t3_body ()
 
   for (int i = 0; i < N; i++){
     read_counter[i] = 0;
-    reader[i] = gr_buffer_add_reader (buf, 0);
+    reader[i] = gr_buffer_add_reader (buf, 0, gr_block_sptr());
   }
 
   for (int lc = 0; lc < 1000; lc++){
diff --git a/gnuradio-core/src/python/gnuradio/gr/top_block.py b/gnuradio-core/src/python/gnuradio/gr/top_block.py
index 8f5754d657..a3161170ad 100644
--- a/gnuradio-core/src/python/gnuradio/gr/top_block.py
+++ b/gnuradio-core/src/python/gnuradio/gr/top_block.py
@@ -22,6 +22,33 @@
 from gnuradio_swig_python import top_block_swig, \
     top_block_wait_unlocked, top_block_run_unlocked
 
+#import gnuradio.gr.gr_threading as _threading
+import gr_threading as _threading
+
+
+#
+# There is no problem that can't be solved with an additional
+# level of indirection...
+#
+# This kludge allows ^C to interrupt top_block.run and top_block.wait
+#
+class _top_block_waiter(_threading.Thread):
+    def __init__(self, tb):
+        _threading.Thread.__init__(self)
+        self.setDaemon(1)
+        self.tb = tb
+        self.event = _threading.Event()
+        self.start()
+
+    def run(self):
+        top_block_wait_unlocked(self.tb)
+        self.event.set()
+
+    def wait(self):
+        while not self.event.isSet():
+            self.event.wait(0.100)
+
+
 #
 # This hack forces a 'has-a' relationship to look like an 'is-a' one.
 #
@@ -48,10 +75,12 @@ class top_block(object):
     	self._tb.stop()
 
     def run(self):
-        top_block_run_unlocked(self._tb)
+        self.start()
+        self.wait()
 
     def wait(self):
-        top_block_wait_unlocked(self._tb)
+        _top_block_waiter(self._tb).wait()
+
 
     # FIXME: these are duplicated from hier_block2.py; they should really be implemented
     # in the original C++ class (gr_hier_block2), then they would all be inherited here
diff --git a/gnuradio-core/src/tests/Makefile.am b/gnuradio-core/src/tests/Makefile.am
index 730de2d194..c6225d972f 100644
--- a/gnuradio-core/src/tests/Makefile.am
+++ b/gnuradio-core/src/tests/Makefile.am
@@ -47,10 +47,12 @@ noinst_PROGRAMS		= 	\
 	benchmark_vco		\
 	test_runtime		\
 	test_general		\
-	test_all		\
 	test_filter		\
 	test_vmcircbuf
 
+bin_PROGRAMS = \
+	test_all
+
 
 noinst_SCRIPTS = \
 	benchmark_dotprod
-- 
cgit v1.2.3