From 92cfb0240005675f4e7a55a81552f4c7a5128cd8 Mon Sep 17 00:00:00 2001
From: Tim O'Shea <tim.oshea753@gmail.com>
Date: Wed, 28 Nov 2012 15:15:58 -0800
Subject: core: adding msg_connect, updating msg interface, adding symbolic
 block names

---
 grc/python/Connection.py   |  3 +++
 grc/python/Constants.py    |  1 +
 grc/python/Generator.py    |  4 +++-
 grc/python/flow_graph.tmpl | 11 +++++++++++
 4 files changed, 18 insertions(+), 1 deletion(-)

(limited to 'grc')

diff --git a/grc/python/Connection.py b/grc/python/Connection.py
index 218baf0743..341dd2d821 100644
--- a/grc/python/Connection.py
+++ b/grc/python/Connection.py
@@ -31,6 +31,9 @@ class Connection(_Connection, _GUIConnection):
 	def is_msg(self):
 		return self.get_source().get_type() == self.get_sink().get_type() == 'msg'
 
+	def is_message(self):
+		return self.get_source().get_type() == self.get_sink().get_type() == 'message'
+
 	def validate(self):
 		"""
 		Validate the connections.
diff --git a/grc/python/Constants.py b/grc/python/Constants.py
index 1a65caf1c0..09c3081967 100644
--- a/grc/python/Constants.py
+++ b/grc/python/Constants.py
@@ -58,6 +58,7 @@ CORE_TYPES = ( #name, key, sizeof, color
 	('Integer 16', 's16', 2, '#FFFF66'),
 	('Integer 8', 's8', 1, '#FF66FF'),
 	('Message Queue', 'msg', 0, '#777777'),
+	('Async Message', 'message', 0, '#777777'),
 	('Wildcard', '', 0, '#FFFFFF'),
 )
 
diff --git a/grc/python/Generator.py b/grc/python/Generator.py
index 2a6fe51d5d..616ea00fcb 100644
--- a/grc/python/Generator.py
+++ b/grc/python/Generator.py
@@ -116,8 +116,9 @@ Add a Misc->Throttle block to your flow graph to avoid CPU congestion.''')
 		#list of regular blocks (all blocks minus the special ones)
 		blocks = filter(lambda b: b not in (imports + parameters), blocks)
 		#list of connections where each endpoint is enabled
-		connections = filter(lambda c: not c.is_msg(), self._flow_graph.get_enabled_connections())
+		connections = filter(lambda c: not (c.is_msg() or c.is_message()), self._flow_graph.get_enabled_connections())
 		messages = filter(lambda c: c.is_msg(), self._flow_graph.get_enabled_connections())
+		messages2 = filter(lambda c: c.is_message(), self._flow_graph.get_enabled_connections())
 		#list of variable names
 		var_ids = [var.get_id() for var in parameters + variables]
 		#prepend self.
@@ -142,6 +143,7 @@ Add a Misc->Throttle block to your flow graph to avoid CPU congestion.''')
 			'blocks': blocks,
 			'connections': connections,
 			'messages': messages,
+			'messages2': messages2,
 			'generate_options': self._generate_options,
 			'var_id2cbs': var_id2cbs,
 		}
diff --git a/grc/python/flow_graph.tmpl b/grc/python/flow_graph.tmpl
index 17feb01f65..af55ad641a 100644
--- a/grc/python/flow_graph.tmpl
+++ b/grc/python/flow_graph.tmpl
@@ -189,6 +189,17 @@ gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))])
 		self.connect($make_port_sig($source), $make_port_sig($sink))
 	#end if
 #end for
+########################################################
+##Create Asynch Message Connections
+########################################################
+#if $messages2
+		$DIVIDER
+		# Asynch Message Connections
+		$DIVIDER
+#end if
+#for $msg in $messages2
+		self.msg_connect(self.$msg.get_source().get_parent().get_id(), "$msg.get_source().get_name()", self.$msg.get_sink().get_parent().get_id(), "$msg.get_sink().get_name()")
+#end for
 
 ########################################################
 ##Create Callbacks
-- 
cgit v1.2.3


From 6cc818260128df57c51a41e4e6aa459de5faf4fe Mon Sep 17 00:00:00 2001
From: Tim O'Shea <tim.oshea753@gmail.com>
Date: Fri, 30 Nov 2012 22:31:43 -0800
Subject: core: gr_blocks can now have only message ports with no
 general_work()

* msg only blocks now get thread context

* added blocking msg queue delete call

* added gr_message_strobe block

* added grc definitions for message_debug, message_strobe, pdu_to_tagged_stream, tagged_stream_to_pdu.

* allow message fan-in connections in GRC
---
 gnuradio-core/src/lib/general/CMakeLists.txt       |  1 +
 gnuradio-core/src/lib/general/general.i            |  2 +
 gnuradio-core/src/lib/general/gr_message_strobe.cc | 83 ++++++++++++++++++++++
 gnuradio-core/src/lib/general/gr_message_strobe.h  | 65 +++++++++++++++++
 gnuradio-core/src/lib/general/gr_message_strobe.i  | 30 ++++++++
 gnuradio-core/src/lib/io/gr_message_debug.cc       |  4 +-
 .../src/lib/io/gr_pdu_to_tagged_stream.cc          |  7 +-
 gnuradio-core/src/lib/runtime/gr_basic_block.cc    | 29 ++++++--
 gnuradio-core/src/lib/runtime/gr_basic_block.h     |  9 +++
 gnuradio-core/src/lib/runtime/gr_block.cc          |  9 +++
 gnuradio-core/src/lib/runtime/gr_block.h           |  2 +-
 gnuradio-core/src/lib/runtime/gr_flowgraph.cc      |  9 +++
 gnuradio-core/src/lib/runtime/gr_flowgraph.h       |  3 +
 .../src/lib/runtime/gr_hier_block2_detail.cc       |  9 +++
 .../src/lib/runtime/gr_tpb_thread_body.cc          |  9 ++-
 .../src/lib/runtime/qa_set_msg_handler.cc          |  5 --
 grc/blocks/block_tree.xml                          |  7 ++
 grc/blocks/gr_message_debug.xml                    | 17 +++++
 grc/blocks/gr_message_strobe.xml                   | 35 +++++++++
 grc/blocks/gr_pdu_to_tagged_stream.xml             | 40 +++++++++++
 grc/blocks/gr_tagged_stream_to_pdu.xml             | 40 +++++++++++
 grc/python/Constants.py                            |  2 +-
 grc/python/Port.py                                 |  2 +-
 gruel/src/include/gruel/msg_accepter.h             |  2 +-
 24 files changed, 401 insertions(+), 20 deletions(-)
 create mode 100644 gnuradio-core/src/lib/general/gr_message_strobe.cc
 create mode 100644 gnuradio-core/src/lib/general/gr_message_strobe.h
 create mode 100644 gnuradio-core/src/lib/general/gr_message_strobe.i
 create mode 100644 grc/blocks/gr_message_debug.xml
 create mode 100644 grc/blocks/gr_message_strobe.xml
 create mode 100644 grc/blocks/gr_pdu_to_tagged_stream.xml
 create mode 100644 grc/blocks/gr_tagged_stream_to_pdu.xml

(limited to 'grc')

diff --git a/gnuradio-core/src/lib/general/CMakeLists.txt b/gnuradio-core/src/lib/general/CMakeLists.txt
index 074f583a74..4c99acfc36 100644
--- a/gnuradio-core/src/lib/general/CMakeLists.txt
+++ b/gnuradio-core/src/lib/general/CMakeLists.txt
@@ -299,6 +299,7 @@ set(gr_core_general_triple_threats
     gr_burst_tagger
     gr_correlate_access_code_tag_bb
     gr_tag_debug
+    gr_message_strobe
 )
 
 foreach(file_tt ${gr_core_general_triple_threats})
diff --git a/gnuradio-core/src/lib/general/general.i b/gnuradio-core/src/lib/general/general.i
index e5a9e970dd..1446088a2c 100644
--- a/gnuradio-core/src/lib/general/general.i
+++ b/gnuradio-core/src/lib/general/general.i
@@ -143,6 +143,7 @@
 #include <gr_add_ff.h>
 #include <gr_vector_map.h>
 #include <gr_tag_debug.h>
+#include <gr_message_strobe.h>
 %}
 
 %include "gri_control_loop.i"
@@ -267,3 +268,4 @@
 %include "gr_vector_map.i"
 %include "gr_tag_debug.i"
 %include "gr_block_gateway.i"
+%include "gr_message_strobe.i"
diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.cc b/gnuradio-core/src/lib/general/gr_message_strobe.cc
new file mode 100644
index 0000000000..371f472efd
--- /dev/null
+++ b/gnuradio-core/src/lib/general/gr_message_strobe.cc
@@ -0,0 +1,83 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005,2010 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_message_strobe.h>
+#include <gr_io_signature.h>
+#include <cstdio>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <string.h>
+#include <iostream>
+
+// public constructor that returns a shared_ptr
+
+gr_message_strobe_sptr
+gr_make_message_strobe (pmt::pmt_t msg, float period_ms)
+{
+  return gnuradio::get_initial_sptr(new gr_message_strobe(msg, period_ms));
+}
+
+gr_message_strobe::gr_message_strobe (pmt::pmt_t msg, float period_ms)
+  : gr_sync_block("message_strobe",
+		  gr_make_io_signature(0, 0, 0),
+		  gr_make_io_signature(0, 0, 0)),
+    d_finished(false),
+    d_period_ms(period_ms),
+    d_msg(msg)
+{
+    message_port_register_out(pmt::mp("strobe"));
+    d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_message_strobe::run, this)));
+
+    message_port_register_in(pmt::mp("set_msg"));
+    set_msg_handler(pmt::mp("set_msg"), boost::bind(&gr_message_strobe::set_msg, this, _1));
+}
+
+gr_message_strobe::~gr_message_strobe()
+{
+    d_finished = true;
+    d_thread->interrupt();
+    d_thread->join();
+}
+
+void gr_message_strobe::run(){
+    while(!d_finished) {
+        boost::this_thread::sleep(boost::posix_time::milliseconds(d_period_ms)); 
+        if(d_finished){ return; }
+//        std::cout << "strobing...\n";
+        message_port_pub( pmt::mp("strobe"), d_msg );
+    } 
+}
+
+int
+gr_message_strobe::work(int noutput_items,
+		      gr_vector_const_void_star &input_items,
+		      gr_vector_void_star &output_items)
+{
+  return 0; // FIXME: replace with default NOP work function in gr_block
+}
diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.h b/gnuradio-core/src/lib/general/gr_message_strobe.h
new file mode 100644
index 0000000000..a5151a30b2
--- /dev/null
+++ b/gnuradio-core/src/lib/general/gr_message_strobe.h
@@ -0,0 +1,65 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_MESSAGE_STROBE_H
+#define INCLUDED_GR_MESSAGE_STROBE_H
+
+#include <gr_core_api.h>
+#include <gr_sync_block.h>
+#include <gr_message.h>
+#include <gr_msg_queue.h>
+
+class gr_message_strobe;
+typedef boost::shared_ptr<gr_message_strobe> gr_message_strobe_sptr;
+
+GR_CORE_API gr_message_strobe_sptr gr_make_message_strobe (pmt::pmt_t msg, float period_ms);
+
+/*!
+ * \brief Gather received items into messages and insert into msgq
+ * \ingroup sink_blk
+ */
+class GR_CORE_API gr_message_strobe : public gr_sync_block
+{
+ private:
+  friend GR_CORE_API gr_message_strobe_sptr
+  gr_make_message_strobe(pmt::pmt_t msg, float period_ms);
+  boost::shared_ptr<boost::thread> d_thread;
+  bool d_finished;
+  float d_period_ms;
+  pmt::pmt_t d_msg;
+
+  void run();
+
+ protected:
+  gr_message_strobe (pmt::pmt_t msg, float period_ms);
+
+ public:
+  ~gr_message_strobe ();
+
+  void set_msg(pmt::pmt_t msg){ d_msg = msg; }
+
+  int work (int noutput_items,
+	    gr_vector_const_void_star &input_items,
+	    gr_vector_void_star &output_items);
+};
+
+#endif /* INCLUDED_GR_MESSAGE_STROBE_H */
diff --git a/gnuradio-core/src/lib/general/gr_message_strobe.i b/gnuradio-core/src/lib/general/gr_message_strobe.i
new file mode 100644
index 0000000000..490aa8e8a1
--- /dev/null
+++ b/gnuradio-core/src/lib/general/gr_message_strobe.i
@@ -0,0 +1,30 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+GR_SWIG_BLOCK_MAGIC(gr,message_strobe);
+
+%{
+#include <gr_message_strobe.h>
+%}
+
+%include "gr_message_strobe.h"
+
diff --git a/gnuradio-core/src/lib/io/gr_message_debug.cc b/gnuradio-core/src/lib/io/gr_message_debug.cc
index 84c11c46eb..99f4a1f7b8 100644
--- a/gnuradio-core/src/lib/io/gr_message_debug.cc
+++ b/gnuradio-core/src/lib/io/gr_message_debug.cc
@@ -44,8 +44,9 @@ gr_make_message_debug ()
 }
 
 void gr_message_debug::print(pmt::pmt_t msg){
-    std::cout << "******* DEBUG PRINT ********\n";
+    std::cout << "******* MESSAGE DEBUG PRINT ********\n";
     pmt::pmt_print(msg);
+    std::cout << "************************************\n";
 }
 
 
@@ -67,5 +68,6 @@ gr_message_debug::work(int noutput_items,
 		      gr_vector_const_void_star &input_items,
 		      gr_vector_void_star &output_items)
 {
+  printf("gr_message_debug::work\n");
   return 0; // FIXME: replace with default NOP work function in gr_block
 }
diff --git a/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc b/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
index 26c1babd68..06a1c95969 100644
--- a/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
+++ b/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
@@ -77,7 +77,8 @@ gr_pdu_to_tagged_stream::work(int noutput_items,
   if(noutput_items > 0){
 
     // grab a message if one exists
-    pmt::pmt_t msg( delete_head_nowait( pdu_port_id ) );
+    //pmt::pmt_t msg( delete_head_nowait( pdu_port_id ) );
+    pmt::pmt_t msg( delete_head_blocking( pdu_port_id ) );
     if(msg.get() == NULL ){
         return nout;
         }
@@ -87,8 +88,8 @@ gr_pdu_to_tagged_stream::work(int noutput_items,
         throw std::runtime_error("received a malformed pdu message!");
         }
    
-    printf("got a msg\n");
-    pmt::pmt_print(msg);
+//    printf("got a msg\n");
+//    pmt::pmt_print(msg);
  
     // grab the components of the pdu message
     pmt::pmt_t meta(pmt::pmt_car(msg)); // make sure this is NIL || Dict ?
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 7d2f275e86..69f2e09f98 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -1,6 +1,6 @@
 /* -*- c++ -*- */
 /*
- * Copyright 2006 Free Software Foundation, Inc.
+ * Copyright 2006,2012 Free Software Foundation, Inc.
  *
  * This file is part of GNU Radio
  *
@@ -28,6 +28,7 @@
 #include <gr_block_registry.h>
 #include <stdexcept>
 #include <sstream>
+#include <iostream>
 
 using namespace pmt;
 
@@ -79,6 +80,7 @@ gr_basic_block::set_block_alias(std::string name)
 //  - register a new input message port
 void gr_basic_block::message_port_register_in(pmt::pmt_t port_id){
     msg_queue[port_id] = msg_queue_t();
+    msg_queue_ready[port_id] = boost::shared_ptr<boost::condition_variable>(new boost::condition_variable());
     }
 
 //  - register a new output message port
@@ -131,7 +133,6 @@ void
 gr_basic_block::_post(pmt_t which_port, pmt_t msg)
 {
   insert_tail(which_port, msg);
-  global_block_registry.notify_blk(alias());
 }
 
 void
@@ -139,12 +140,16 @@ gr_basic_block::insert_tail(pmt::pmt_t which_port, pmt::pmt_t msg)
 {
   gruel::scoped_lock guard(mutex);
 
+    if( (msg_queue.find(which_port) == msg_queue.end()) || (msg_queue_ready.find(which_port) == msg_queue_ready.end())){
+        std::cout << "target port = " << pmt::pmt_symbol_to_string(which_port) << std::endl;
+        throw std::runtime_error("attempted to insert_tail on invalid queue!");
+    }
+
   msg_queue[which_port].push_back(msg);
+  msg_queue_ready[which_port]->notify_one();
 
   // wake up thread if BLKD_IN or BLKD_OUT
-  //input_cond.notify_one();
-  //output_cond.notify_one();
-  // TODO: reconsider the need for notification of input and output conditions!
+  global_block_registry.notify_blk(alias());
 }
 
 pmt_t
@@ -162,4 +167,18 @@ gr_basic_block::delete_head_nowait(pmt::pmt_t which_port)
   return m;
 }
 
+pmt_t
+gr_basic_block::delete_head_blocking(pmt::pmt_t which_port)
+{
+  gruel::scoped_lock guard(mutex);
+
+  while (empty_p(which_port)){
+    msg_queue_ready[which_port]->wait(guard);
+    }
+
+  pmt_t m(msg_queue[which_port].front());
+  msg_queue[which_port].pop_front();
+  return m;
+}
+
 
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 2ee8161c1d..e0fd5d2afd 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -35,6 +35,7 @@
 #include <gr_io_signature.h>
 #include <gruel/thread.h>
 #include <boost/foreach.hpp>
+#include <boost/thread/condition_variable.hpp>
 
 /*!
  * \brief The abstract base class for all signal processing blocks.
@@ -72,6 +73,9 @@ private:
     typedef std::deque<pmt::pmt_t>    msg_queue_t;
     typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>    msg_queue_map_t;
     msg_queue_map_t msg_queue;
+//    boost::condition_variable msg_queue_ready;
+    std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
+
     gruel::mutex          mutex;          //< protects all vars
 
 
@@ -163,6 +167,11 @@ public:
      */
     pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
 
+    /*!
+     * \returns returns pmt at head of queue or pmt_t() if empty.
+     */
+    pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
+
     msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
         return msg_queue[which_port].begin();
         }
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index dc77128a39..43aebf0bfd 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -251,3 +251,12 @@ operator << (std::ostream& os, const gr_block *m)
   return os;
 }
 
+int
+gr_block::general_work(int noutput_items,
+		       gr_vector_int &ninput_items,
+		       gr_vector_const_void_star &input_items,
+		       gr_vector_void_star &output_items)
+{
+  throw std::runtime_error("gr_block::general_work() not implemented");
+  return 0;
+}
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 98339080d0..57e3fda90a 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -124,7 +124,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
   virtual int general_work (int noutput_items,
 			    gr_vector_int &ninput_items,
 			    gr_vector_const_void_star &input_items,
-			    gr_vector_void_star &output_items) = 0;
+			    gr_vector_void_star &output_items);
 
   /*!
    * \brief Called to enable drivers, etc for i/o devices.
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
index 78e1bc99af..69c304a3d8 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
@@ -180,6 +180,11 @@ gr_flowgraph::calc_used_blocks()
 {
   gr_basic_block_vector_t tmp;
 
+  // make sure free standing message blocks are included
+  for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){
+    tmp.push_back(*it);
+  }
+
   // Collect all blocks in the edge list
   for (gr_edge_viter_t p = d_edges.begin(); p != d_edges.end(); p++) {
     tmp.push_back(p->src().block());
@@ -472,3 +477,7 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve
   output.push_back(block);
 }
 
+void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){
+    d_msgblocks.push_back(blk);
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index a2c1580eb8..860cb0ff1e 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -110,6 +110,8 @@ public:
   void disconnect(gr_basic_block_sptr src_block, int src_port,
 		  gr_basic_block_sptr dst_block, int dst_port);
 
+  void add_msg_block(gr_basic_block_sptr blk);
+
   // Validate connectivity, raise exception if invalid
   void validate();
 
@@ -128,6 +130,7 @@ public:
   // Return vector of vectors of disjointly connected blocks, topologically
   // sorted.
   std::vector<gr_basic_block_vector_t> partition();
+  gr_basic_block_vector_t d_msgblocks;
 
 protected:
   gr_basic_block_vector_t d_blocks;
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index 099b2f8e88..ff2a5db8cc 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -152,6 +152,14 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
     
   // register the subscription
   src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+
+  // add block uniquely to list to internal blocks
+  if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){
+    d_blocks.push_back(dst);
+    }
+
+  // make sure we instantiate a thread for this block
+  d_fg->add_msg_block(dst);
 }
 
 void
@@ -449,6 +457,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
       }
     }
   }
+  sfg->d_msgblocks = d_fg->d_msgblocks;
 
   // Construct unique list of blocks used either in edges, inputs,
   // outputs, or by themselves.  I still hate STL.
diff --git a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
index ff2afca103..9f17a48a80 100644
--- a/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
+++ b/gnuradio-core/src/lib/runtime/gr_tpb_thread_body.cc
@@ -32,7 +32,7 @@ using namespace pmt;
 gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_items)
   : d_exec(block, max_noutput_items)
 {
-  // std::cerr << "gr_tpb_thread_body: " << block << std::endl;
+  //std::cerr << "gr_tpb_thread_body: " << block << std::endl;
 
   gr_block_detail *d = block->detail().get();
   gr_block_executor::state s;
@@ -53,7 +53,12 @@ gr_tpb_thread_body::gr_tpb_thread_body(gr_block_sptr block, int max_noutput_item
     }
 
     d->d_tpb.clear_changed();
-    s = d_exec.run_one_iteration();
+    // run one iteration if we are a connected stream block
+    if(d->noutputs() >0 || d->ninputs()>0){
+        s = d_exec.run_one_iteration();
+    } else {
+        s = gr_block_executor::BLKD_IN;
+    }
 
     switch(s){
     case gr_block_executor::READY:		// Tell neighbors we made progress.
diff --git a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
index dc8f0f8a95..c84a219bd1 100644
--- a/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
+++ b/gnuradio-core/src/lib/runtime/qa_set_msg_handler.cc
@@ -70,11 +70,6 @@ void qa_set_msg_handler::t0()
     send(nop, port, mp(mp("example-msg"), mp(i)));
   }
 
-  // And send a message to null_source to confirm that the default
-  // message handling action (which should be a nop) doesn't dump
-  // core.
-  send(src, port, mp(mp("example-msg"), mp(0)));
-
   // Give the messages a chance to be processed
   boost::this_thread::sleep(boost::posix_time::milliseconds(100));
 
diff --git a/grc/blocks/block_tree.xml b/grc/blocks/block_tree.xml
index d7ec82e4ab..95bd7bb3ce 100644
--- a/grc/blocks/block_tree.xml
+++ b/grc/blocks/block_tree.xml
@@ -37,6 +37,13 @@
 		<block>virtual_sink</block>
 		<block>gr_tag_debug</block>
 	</cat>
+    <cat>
+        <name>Message Tools</name>
+        <block>gr_message_debug</block>
+        <block>gr_message_strobe</block>
+        <block>gr_pdu_to_tagged_stream</block>
+        <block>gr_tagged_stream_to_pdu</block>
+    </cat>
 	<cat>
 		<name>Operators</name>
 		<block>gr_add_xx</block>
diff --git a/grc/blocks/gr_message_debug.xml b/grc/blocks/gr_message_debug.xml
new file mode 100644
index 0000000000..9478f53045
--- /dev/null
+++ b/grc/blocks/gr_message_debug.xml
@@ -0,0 +1,17 @@
+<?xml version="1.0"?>
+<!--
+###################################################
+##Message Debug
+###################################################
+ -->
+<block>
+	<name>Message Debug</name>
+	<key>gr_message_debug</key>
+	<import>from gnuradio import gr</import>
+	<make>gr.message_debug()</make>
+	<sink>
+		<name>print</name>
+		<type>message</type>
+        <optional>1</optional>
+	</sink>
+</block>
diff --git a/grc/blocks/gr_message_strobe.xml b/grc/blocks/gr_message_strobe.xml
new file mode 100644
index 0000000000..60a7724dfc
--- /dev/null
+++ b/grc/blocks/gr_message_strobe.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0"?>
+<!--
+###################################################
+##Message Strobe 
+###################################################
+ -->
+<block>
+	<name>Message Strobe</name>
+	<key>gr_message_strobe</key>
+	<import>from gnuradio import gr</import>
+	<import>from gruel import pmt</import>
+	<make>gr.message_strobe($msg, $period)</make>
+	<param>
+		<name>Message PMT</name>
+		<key>msg</key>
+		<value>pmt.pmt_intern("TEST")</value>
+		<type>raw</type>
+	</param>
+	<param>
+		<name>Period (ms)</name>
+		<key>period</key>
+		<value>1000</value>
+		<type>real</type>
+	</param>
+	<sink>
+		<name>set_msg</name>
+		<type>message</type>
+        <optional>1</optional>
+	</sink>
+	<source>
+		<name>strobe</name>
+		<type>message</type>
+        <optional>1</optional>
+	</source>
+</block>
diff --git a/grc/blocks/gr_pdu_to_tagged_stream.xml b/grc/blocks/gr_pdu_to_tagged_stream.xml
new file mode 100644
index 0000000000..fc1c4d16a3
--- /dev/null
+++ b/grc/blocks/gr_pdu_to_tagged_stream.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<!--
+###################################################
+## PDU Message to Tagged Stream
+###################################################
+ -->
+<block>
+	<name>PDU to Tagged Stream</name>
+	<key>gr_pdu_to_tagged_stream</key>
+	<import>from gnuradio import gr</import>
+	<make>gr.pdu_to_tagged_stream($type.tv)</make>
+    <param>
+        <name>Item Type</name>
+        <key>type</key>
+        <type>enum</type>
+        <option>
+            <name>Byte</name>
+            <key>byte</key>
+            <opt>tv:gr.BYTE</opt>
+        </option>
+        <option>
+            <name>Complex</name>
+            <key>complex</key>
+            <opt>tv:gr.COMPLEX</opt>
+        </option>
+        <option>
+            <name>Float</name>
+            <key>float</key>
+            <opt>tv:gr.FLOAT</opt>
+        </option>
+    </param>
+	<sink>
+		<name>pdus</name>
+		<type>message</type>
+	</sink>
+	<source>
+		<name>out</name>
+		<type>$type</type>
+	</source>
+</block>
diff --git a/grc/blocks/gr_tagged_stream_to_pdu.xml b/grc/blocks/gr_tagged_stream_to_pdu.xml
new file mode 100644
index 0000000000..e70a016080
--- /dev/null
+++ b/grc/blocks/gr_tagged_stream_to_pdu.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<!--
+###################################################
+## Tagged Stream to PDU Message
+###################################################
+ -->
+<block>
+	<name>Tagged Stream to PDU</name>
+	<key>gr_tagged_stream_to_pdu</key>
+	<import>from gnuradio import gr</import>
+	<make>gr.tagged_stream_to_pdu($type.tv)</make>
+    <param>
+        <name>Item Type</name>
+        <key>type</key>
+        <type>enum</type>
+        <option>
+            <name>Byte</name>
+            <key>byte</key>
+            <opt>tv:gr.BYTE</opt>
+        </option>
+        <option>
+            <name>Complex</name>
+            <key>complex</key>
+            <opt>tv:gr.COMPLEX</opt>
+        </option>
+        <option>
+            <name>Float</name>
+            <key>float</key>
+            <opt>tv:gr.FLOAT</opt>
+        </option>
+    </param>
+	<sink>
+		<name>in</name>
+		<type>$type</type>
+	</sink>
+	<source>
+		<name>pdus</name>
+		<type>message</type>
+	</source>
+</block>
diff --git a/grc/python/Constants.py b/grc/python/Constants.py
index 09c3081967..b8dc9a96a1 100644
--- a/grc/python/Constants.py
+++ b/grc/python/Constants.py
@@ -58,7 +58,7 @@ CORE_TYPES = ( #name, key, sizeof, color
 	('Integer 16', 's16', 2, '#FFFF66'),
 	('Integer 8', 's8', 1, '#FF66FF'),
 	('Message Queue', 'msg', 0, '#777777'),
-	('Async Message', 'message', 0, '#777777'),
+	('Async Message', 'message', 0, '#C0C0C0'),
 	('Wildcard', '', 0, '#FFFFFF'),
 )
 
diff --git a/grc/python/Port.py b/grc/python/Port.py
index 9f8b50d052..738a33ba72 100644
--- a/grc/python/Port.py
+++ b/grc/python/Port.py
@@ -116,7 +116,7 @@ class Port(_Port, _GUIPort):
 		_Port.validate(self)
 		if not self.get_enabled_connections() and not self.get_optional():
 			self.add_error_message('Port is not connected.')
-		if not self.is_source() and len(self.get_enabled_connections()) > 1:
+		if not self.is_source() and (not self.get_type() == "message") and len(self.get_enabled_connections()) > 1:
 			self.add_error_message('Port has too many connections.')
 		#message port logic
 		if self.get_type() == 'msg':
diff --git a/gruel/src/include/gruel/msg_accepter.h b/gruel/src/include/gruel/msg_accepter.h
index 65abd5a6b8..45acb3c784 100644
--- a/gruel/src/include/gruel/msg_accepter.h
+++ b/gruel/src/include/gruel/msg_accepter.h
@@ -37,7 +37,7 @@ namespace gruel {
     virtual ~msg_accepter();
 
     /*!
-     * \brief send \p msg to \p msg_accepter
+     * \brief send \p msg to \p msg_accepter on port \p which_port
      *
      * Sending a message is an asynchronous operation.  The \p post
      * call will not wait for the message either to arrive at the
-- 
cgit v1.2.3


From 53be45f118e6e73d2a50fe0ba4622d6dfe96117c Mon Sep 17 00:00:00 2001
From: Tom Rondeau <trondeau@vt.edu>
Date: Thu, 6 Dec 2012 12:52:35 -0500
Subject: core: updated the message debug block to have a 'store' port where
 messages can be retrieved afterwards.

Updated qa_pdu to use the new 'store' port for testing the resulting message.
---
 gnuradio-core/src/lib/io/gr_message_debug.cc   | 39 +++++++++++++++++---
 gnuradio-core/src/lib/io/gr_message_debug.h    | 49 +++++++++++++++++++++++++-
 gnuradio-core/src/python/gnuradio/gr/qa_pdu.py | 49 +++++++++++++++++---------
 grc/blocks/gr_message_debug.xml                |  7 +++-
 4 files changed, 121 insertions(+), 23 deletions(-)

(limited to 'grc')

diff --git a/gnuradio-core/src/lib/io/gr_message_debug.cc b/gnuradio-core/src/lib/io/gr_message_debug.cc
index d98954576a..7d28ff18e9 100644
--- a/gnuradio-core/src/lib/io/gr_message_debug.cc
+++ b/gnuradio-core/src/lib/io/gr_message_debug.cc
@@ -43,20 +43,49 @@ gr_make_message_debug ()
   return gnuradio::get_initial_sptr(new gr_message_debug());
 }
 
-void gr_message_debug::print(pmt::pmt_t msg){
-    std::cout << "******* MESSAGE DEBUG PRINT ********\n";
-    pmt::pmt_print(msg);
-    std::cout << "************************************\n";
+void
+gr_message_debug::print(pmt::pmt_t msg)
+{
+  std::cout << "******* MESSAGE DEBUG PRINT ********\n";
+  pmt::pmt_print(msg);
+  std::cout << "************************************\n";
+}
+
+void
+gr_message_debug::store(pmt::pmt_t msg)
+{
+  gruel::scoped_lock guard(d_mutex);
+  d_messages.push_back(msg);
+}
+
+int
+gr_message_debug::num_messages()
+{
+  return (int)d_messages.size();
 }
 
+pmt::pmt_t
+gr_message_debug::get_message(int i)
+{
+  gruel::scoped_lock guard(d_mutex);
+
+  if((size_t)i >= d_messages.size()) {
+    throw std::runtime_error("gr_message_debug: index for message out of bounds.\n");
+  }
 
-gr_message_debug::gr_message_debug ()
+  return d_messages[i];
+}
+
+gr_message_debug::gr_message_debug()
   : gr_block("message_debug",
 	     gr_make_io_signature(0, 0, 0),
 	     gr_make_io_signature(0, 0, 0))
 {
     message_port_register_in(pmt::mp("print"));
     set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1));
+
+    message_port_register_in(pmt::mp("store"));
+    set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1));
 }
 
 gr_message_debug::~gr_message_debug()
diff --git a/gnuradio-core/src/lib/io/gr_message_debug.h b/gnuradio-core/src/lib/io/gr_message_debug.h
index 120694a916..1ffef1b023 100644
--- a/gnuradio-core/src/lib/io/gr_message_debug.h
+++ b/gnuradio-core/src/lib/io/gr_message_debug.h
@@ -27,11 +27,12 @@
 #include <gr_block.h>
 #include <gr_message.h>
 #include <gr_msg_queue.h>
+#include <gruel/thread.h>
 
 class gr_message_debug;
 typedef boost::shared_ptr<gr_message_debug> gr_message_debug_sptr;
 
-GR_CORE_API gr_message_debug_sptr gr_make_message_debug ();
+GR_CORE_API gr_message_debug_sptr gr_make_message_debug();
 
 /*!
  * \brief Print received messages to stdout
@@ -43,13 +44,59 @@ class GR_CORE_API gr_message_debug : public gr_block
   friend GR_CORE_API gr_message_debug_sptr
   gr_make_message_debug();
 
+  /*!
+   * \brief Messages received in this port are printed to stdout.
+   *
+   * This port receives messages from the scheduler's message handling
+   * mechanism and prints it to stdout. This message handler function
+   * is only meant to be used by the scheduler to handle messages
+   * posted to port 'print'.
+   *
+   * \param msg A pmt message passed from the scheduler's message handling.
+   */
   void print(pmt::pmt_t msg);
 
+  /*!
+   * \brief Messages received in this port are stored in a vector.
+   *
+   * This port receives messages from the scheduler's message handling
+   * mechanism and stores it in a vector. Messages can be retrieved
+   * later using the 'get_message' function. This message handler
+   * function is only meant to be used by the scheduler to handle
+   * messages posted to port 'store'.
+   *
+   * \param msg A pmt message passed from the scheduler's message handling.
+   */
+  void store(pmt::pmt_t msg);
+
+  gruel::mutex d_mutex;
+  std::vector<pmt::pmt_t> d_messages;
+
  protected:
   gr_message_debug ();
 
  public:
   ~gr_message_debug ();
+
+  /*! 
+   * \brief Reports the number of messages received by this block.
+   */
+  int num_messages();
+
+  /*!
+   * \brief Get a message (as a PMT) from the message vector at index \p i.
+   *
+   * Messages passed to the 'store' port will be stored in a
+   * vector. This function retrieves those messages by index. They are
+   * index in order of when they were received (all messages are just
+   * pushed onto the back of a vector). This is mostly useful in
+   * debugging message passing graphs and in QA code.
+   *
+   * \param i The index in the vector for the message to retrieve.
+   *
+   * \return a message at index \p i as a pmt_t.
+   */
+  pmt::pmt_t get_message(int i);
 };
 
 #endif /* INCLUDED_GR_MESSAGE_DEBUG_H */
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
index 83c7748af8..da1331d968 100755
--- a/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_pdu.py
@@ -36,36 +36,53 @@ class test_pdu(gr_unittest.TestCase):
         # Just run some data through and make sure it doesn't puke.
         src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
 
-        src = gr.pdu_to_tagged_stream(gr.BYTE);
-        snk3 = gr.tagged_stream_to_pdu(gr.BYTE);
-        snk2 = gr.vector_sink_b();
-        snk = gr.tag_debug(1, "test");
+        src = gr.pdu_to_tagged_stream(gr.BYTE)
+        snk3 = gr.tagged_stream_to_pdu(gr.BYTE)
+        snk2 = gr.vector_sink_b()
+        snk = gr.tag_debug(1, "test")
 
-        dbg = gr.message_debug();
+        dbg = gr.message_debug()
 
         self.tb.connect(src, snk)
         self.tb.connect(src, snk2)
         self.tb.connect(src, snk3)
 
-        self.tb.msg_connect(snk3, "pdus", dbg, "print");
+        self.tb.msg_connect(snk3, "pdus", dbg, "store")
         self.tb.start()
 
         # make our reference and message pmts
-        port = pmt.pmt_intern("pdus");
-        msg = pmt.pmt_cons( pmt.PMT_NIL, pmt.pmt_make_u8vector(16, 0xFF) );
+        port = pmt.pmt_intern("pdus")
+        msg = pmt.pmt_cons( pmt.PMT_NIL, pmt.pmt_make_u8vector(16, 0xFF) )
         
-        print "printing port & msg"
-        pmt.pmt_print(port);
-        pmt.pmt_print(msg);
+        #print "printing port & msg"
+        #pmt.pmt_print(port)
+        #pmt.pmt_print(msg)
 
         # post the message
-        src.to_basic_block()._post( port, msg );
+        src.to_basic_block()._post( port, msg )
 
-        time.sleep(1);
-        self.tb.stop();
-        self.tb.wait();
+        while(dbg.num_messages() < 1):
+            time.sleep(0.5)
+        self.tb.stop()
+        self.tb.wait()
 
-        print snk2.data();
+        # Get the vector of data from the vector sink
+        result_data = snk2.data()
+
+        # Get the vector of data from the message sink
+        # Convert the message PMT as a pair into its vector
+        result_msg = dbg.get_message(0)
+        msg_vec = pmt.pmt_cdr(result_msg)
+        pmt.pmt_print(msg_vec)
+
+        # Convert the PMT vector into a Python list
+        msg_data = []
+        for i in xrange(16):
+            msg_data.append(pmt.pmt_u8vector_ref(msg_vec, i))
+
+        actual_data = 16*[0xFF,]
+        self.assertEqual(actual_data, list(result_data))
+        self.assertEqual(actual_data, msg_data)
 
 if __name__ == '__main__':
     gr_unittest.run(test_pdu, "test_pdu.xml")
diff --git a/grc/blocks/gr_message_debug.xml b/grc/blocks/gr_message_debug.xml
index 9478f53045..705a7cc5f3 100644
--- a/grc/blocks/gr_message_debug.xml
+++ b/grc/blocks/gr_message_debug.xml
@@ -12,6 +12,11 @@
 	<sink>
 		<name>print</name>
 		<type>message</type>
-        <optional>1</optional>
+		<optional>1</optional>
+	</sink>
+	<sink>
+		<name>store</name>
+		<type>message</type>
+		<optional>1</optional>
 	</sink>
 </block>
-- 
cgit v1.2.3


From 52ca5e2765b7a4532d26502b5b76b7c85c5019d7 Mon Sep 17 00:00:00 2001
From: Tim O'Shea <tim.oshea753@gmail.com>
Date: Fri, 7 Dec 2012 09:28:41 -0800
Subject: core: added gr_tuntap_pdu, gr_socket_pdu, and msg passing
 enhancements

---
 CMakeLists.txt                                     |   7 +
 gnuradio-core/src/lib/io/CMakeLists.txt            |   4 +
 gnuradio-core/src/lib/io/gr_message_debug.cc       |  34 +-
 gnuradio-core/src/lib/io/gr_message_debug.h        |   1 +
 gnuradio-core/src/lib/io/gr_pdu.cc                 |  70 ++--
 gnuradio-core/src/lib/io/gr_pdu.h                  |   1 +
 gnuradio-core/src/lib/io/gr_socket_pdu.cc          | 157 ++++++++
 gnuradio-core/src/lib/io/gr_socket_pdu.h           | 203 +++++++++++
 gnuradio-core/src/lib/io/gr_socket_pdu.i           |  33 ++
 gnuradio-core/src/lib/io/gr_stream_pdu_base.cc     | 117 ++++++
 gnuradio-core/src/lib/io/gr_stream_pdu_base.h      |  62 ++++
 gnuradio-core/src/lib/io/gr_tuntap_pdu.cc          | 143 ++++++++
 gnuradio-core/src/lib/io/gr_tuntap_pdu.h           |  66 ++++
 gnuradio-core/src/lib/io/gr_tuntap_pdu.i           |  30 ++
 gnuradio-core/src/lib/io/io.i                      |   5 +
 gnuradio-core/src/lib/runtime/gr_basic_block.cc    |  41 +--
 gnuradio-core/src/lib/runtime/gr_basic_block.h     | 406 +++++++++++----------
 gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc |  34 +-
 gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h  |   4 +-
 gnuradio-core/src/lib/runtime/gr_flowgraph.cc      |  40 +-
 gnuradio-core/src/lib/runtime/gr_flowgraph.h       |  61 +++-
 gnuradio-core/src/lib/runtime/gr_hier_block2.cc    |   5 +-
 gnuradio-core/src/lib/runtime/gr_hier_block2.h     |  33 ++
 gnuradio-core/src/lib/runtime/gr_hier_block2.i     |   6 +
 .../src/lib/runtime/gr_hier_block2_detail.cc       |  90 ++++-
 .../src/lib/runtime/gr_hier_block2_detail.h        |   1 +
 .../src/python/gnuradio/gr/hier_block2.py          |  13 +
 grc/blocks/block_tree.xml                          |   2 +
 grc/blocks/gr_message_debug.xml                    |   5 +
 grc/blocks/gr_socket_pdu.xml                       |  62 ++++
 grc/blocks/gr_tuntap_pdu.xml                       |  34 ++
 grc/blocks/pad_sink.xml                            |   9 +-
 grc/blocks/pad_source.xml                          |   9 +-
 grc/python/FlowGraph.py                            |  10 +
 grc/python/convert_hier.py                         |  20 +-
 grc/python/flow_graph.tmpl                         |  17 +-
 gruel/src/include/gruel/pmt.h                      |   6 +
 gruel/src/lib/pmt/pmt.cc                           |  16 +
 gruel/src/swig/pmt_swig.i                          |   5 +
 volk/CMakeLists.txt                                |  10 +-
 volk/apps/CMakeLists.txt                           |   2 +-
 41 files changed, 1595 insertions(+), 279 deletions(-)
 create mode 100644 gnuradio-core/src/lib/io/gr_socket_pdu.cc
 create mode 100644 gnuradio-core/src/lib/io/gr_socket_pdu.h
 create mode 100644 gnuradio-core/src/lib/io/gr_socket_pdu.i
 create mode 100644 gnuradio-core/src/lib/io/gr_stream_pdu_base.cc
 create mode 100644 gnuradio-core/src/lib/io/gr_stream_pdu_base.h
 create mode 100644 gnuradio-core/src/lib/io/gr_tuntap_pdu.cc
 create mode 100644 gnuradio-core/src/lib/io/gr_tuntap_pdu.h
 create mode 100644 gnuradio-core/src/lib/io/gr_tuntap_pdu.i
 create mode 100644 grc/blocks/gr_socket_pdu.xml
 create mode 100644 grc/blocks/gr_tuntap_pdu.xml

(limited to 'grc')

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9af8d7eb9b..bc076b9e74 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -49,6 +49,13 @@ include(GrVersion) #setup version info
 SET(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O2")
 SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O2")
 
+########################################################################
+# Environment setup
+########################################################################
+IF(NOT DEFINED BOOST_ROOT)
+    SET(BOOST_ROOT ${CMAKE_INSTALL_PREFIX})
+ENDIF()
+
 ########################################################################
 # Import executables from a native build (for cross compiling)
 # http://www.vtk.org/Wiki/CMake_Cross_Compiling#Using_executables_in_the_build_created_during_the_build
diff --git a/gnuradio-core/src/lib/io/CMakeLists.txt b/gnuradio-core/src/lib/io/CMakeLists.txt
index 7041f28206..59ca06b5a2 100644
--- a/gnuradio-core/src/lib/io/CMakeLists.txt
+++ b/gnuradio-core/src/lib/io/CMakeLists.txt
@@ -39,6 +39,7 @@ list(APPEND gnuradio_core_sources
     ${CMAKE_CURRENT_SOURCE_DIR}/ppio_ppdev.cc
     ${CMAKE_CURRENT_SOURCE_DIR}/gri_wavfile.cc
     ${CMAKE_CURRENT_SOURCE_DIR}/gr_pdu.cc
+    ${CMAKE_CURRENT_SOURCE_DIR}/gr_stream_pdu_base.cc
 )
 
 ########################################################################
@@ -61,6 +62,7 @@ install(FILES
     ${CMAKE_CURRENT_SOURCE_DIR}/ppio_ppdev.h
     ${CMAKE_CURRENT_SOURCE_DIR}/gri_wavfile.h
     ${CMAKE_CURRENT_SOURCE_DIR}/gr_pdu.h
+    ${CMAKE_CURRENT_SOURCE_DIR}/gr_stream_pdu_base.h
     DESTINATION ${GR_INCLUDE_DIR}/gnuradio
     COMPONENT "core_devel"
 )
@@ -103,6 +105,8 @@ set(gr_core_io_triple_threats
     gr_wavfile_sink
     gr_tagged_file_sink
     gr_tagged_stream_to_pdu
+    gr_tuntap_pdu
+    gr_socket_pdu
 )
 
 foreach(file_tt ${gr_core_io_triple_threats})
diff --git a/gnuradio-core/src/lib/io/gr_message_debug.cc b/gnuradio-core/src/lib/io/gr_message_debug.cc
index 7d28ff18e9..27f4c65fdc 100644
--- a/gnuradio-core/src/lib/io/gr_message_debug.cc
+++ b/gnuradio-core/src/lib/io/gr_message_debug.cc
@@ -58,6 +58,30 @@ gr_message_debug::store(pmt::pmt_t msg)
   d_messages.push_back(msg);
 }
 
+void
+gr_message_debug::print_verbose(pmt::pmt_t msg)
+{
+  pmt::pmt_t meta = pmt::pmt_car(msg);
+  pmt::pmt_t vector = pmt::pmt_cdr(msg);
+  std::cout << "* MESSAGE DEBUG PRINT PDU VERBOSE *\n";
+  pmt::pmt_print(meta);
+  size_t len = pmt::pmt_length(vector);
+  std::cout << "pdu_length = " << len << std::endl;
+  std::cout << "contents = " << std::endl;
+  size_t offset(0);
+  const uint8_t* d = (const uint8_t*) pmt_uniform_vector_elements(vector, offset);
+  for(size_t i=0; i<len; i+=16){
+    printf("%04x: ", i);
+    for(size_t j=i; j<std::min(i+16,len); j++){
+      printf("%02x ",d[j] );
+    }
+
+    std::cout << std::endl;
+  }
+
+  std::cout << "***********************************\n";
+}
+
 int
 gr_message_debug::num_messages()
 {
@@ -81,11 +105,11 @@ gr_message_debug::gr_message_debug()
 	     gr_make_io_signature(0, 0, 0),
 	     gr_make_io_signature(0, 0, 0))
 {
-    message_port_register_in(pmt::mp("print"));
-    set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1));
-
-    message_port_register_in(pmt::mp("store"));
-    set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1));
+  message_port_register_in(pmt::mp("print"));
+  set_msg_handler(pmt::mp("print"), boost::bind(&gr_message_debug::print, this, _1));
+  
+  message_port_register_in(pmt::mp("store"));
+  set_msg_handler(pmt::mp("store"), boost::bind(&gr_message_debug::store, this, _1));
 }
 
 gr_message_debug::~gr_message_debug()
diff --git a/gnuradio-core/src/lib/io/gr_message_debug.h b/gnuradio-core/src/lib/io/gr_message_debug.h
index 1ffef1b023..6e6e5103cb 100644
--- a/gnuradio-core/src/lib/io/gr_message_debug.h
+++ b/gnuradio-core/src/lib/io/gr_message_debug.h
@@ -55,6 +55,7 @@ class GR_CORE_API gr_message_debug : public gr_block
    * \param msg A pmt message passed from the scheduler's message handling.
    */
   void print(pmt::pmt_t msg);
+  void print_verbose(pmt::pmt_t msg);
 
   /*!
    * \brief Messages received in this port are stored in a vector.
diff --git a/gnuradio-core/src/lib/io/gr_pdu.cc b/gnuradio-core/src/lib/io/gr_pdu.cc
index f33eed0a37..b2757c307e 100644
--- a/gnuradio-core/src/lib/io/gr_pdu.cc
+++ b/gnuradio-core/src/lib/io/gr_pdu.cc
@@ -28,42 +28,52 @@
 
 size_t 
 gr_pdu_itemsize(gr_pdu_vector_type type){
-    switch(type){
-        case BYTE:
-            return 1;
-        case FLOAT:
-            return sizeof(float);
-        case COMPLEX:
-            return sizeof(gr_complex);
-        default:
-            throw std::runtime_error("bad type!");
-    }
+  switch(type){
+  case BYTE:
+    return 1;
+  case FLOAT:
+    return sizeof(float);
+  case COMPLEX:
+    return sizeof(gr_complex);
+  default:
+    throw std::runtime_error("bad type!");
+  }
 }
 
 bool
 gr_pdu_type_matches(gr_pdu_vector_type type, pmt::pmt_t v){
-    switch(type){
-        case BYTE:
-            return pmt::pmt_is_u8vector(v);
-        case FLOAT:
-            return pmt::pmt_is_f32vector(v);
-        case COMPLEX:
-            return pmt::pmt_is_c32vector(v);
-        default:
-            throw std::runtime_error("bad type!");
-    }
+  switch(type){
+  case BYTE:
+    return pmt::pmt_is_u8vector(v);
+  case FLOAT:
+    return pmt::pmt_is_f32vector(v);
+  case COMPLEX:
+    return pmt::pmt_is_c32vector(v);
+  default:
+    throw std::runtime_error("bad type!");
+  }
 }
 
 pmt::pmt_t
 gr_pdu_make_vector(gr_pdu_vector_type type, const uint8_t* buf, size_t items){
-    switch(type){
-        case BYTE:
-            return pmt::pmt_init_u8vector(items, buf);
-        case FLOAT:
-            return pmt::pmt_init_f32vector(items, (const float*)buf);
-        case COMPLEX:
-            return pmt::pmt_init_c32vector(items, (const gr_complex*)buf);
-        default:
-            throw std::runtime_error("bad type!");
-    }
+  switch(type){
+  case BYTE:
+    return pmt::pmt_init_u8vector(items, buf);
+  case FLOAT:
+    return pmt::pmt_init_f32vector(items, (const float*)buf);
+  case COMPLEX:
+    return pmt::pmt_init_c32vector(items, (const gr_complex*)buf);
+  default:
+    throw std::runtime_error("bad type!");
+  }
+}
+
+gr_pdu_vector_type type_from_pmt(pmt::pmt_t vector){
+  if(pmt_is_u8vector(vector))
+    return BYTE;
+  if(pmt_is_f32vector(vector))
+    return FLOAT;
+  if(pmt_is_c32vector(vector))
+    return COMPLEX;
+  throw std::runtime_error("bad type!");
 }
diff --git a/gnuradio-core/src/lib/io/gr_pdu.h b/gnuradio-core/src/lib/io/gr_pdu.h
index 67519c89db..5ed9cdded8 100644
--- a/gnuradio-core/src/lib/io/gr_pdu.h
+++ b/gnuradio-core/src/lib/io/gr_pdu.h
@@ -34,5 +34,6 @@ enum gr_pdu_vector_type { BYTE, FLOAT, COMPLEX };
 size_t gr_pdu_itemsize(gr_pdu_vector_type type);
 bool gr_pdu_type_matches(gr_pdu_vector_type type, pmt::pmt_t v);
 pmt::pmt_t gr_pdu_make_vector(gr_pdu_vector_type type, const uint8_t* buf, size_t items);
+gr_pdu_vector_type type_from_pmt(pmt::pmt_t vector);
 
 #endif
diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.cc b/gnuradio-core/src/lib/io/gr_socket_pdu.cc
new file mode 100644
index 0000000000..bb374b3006
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_socket_pdu.cc
@@ -0,0 +1,157 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_socket_pdu.h>
+#include <gr_io_signature.h>
+#include <cstdio>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <string.h>
+#include <iostream>
+#include <gr_pdu.h>
+#include <boost/format.hpp>
+
+// public constructor that returns a shared_ptr
+gr_socket_pdu_sptr
+gr_make_socket_pdu (std::string type, std::string addr, std::string port, int MTU)
+{
+  return gnuradio::get_initial_sptr(new gr_socket_pdu(type,addr,port,MTU));
+}
+
+gr_socket_pdu::gr_socket_pdu (std::string type, std::string addr, std::string port, int MTU)
+    : gr_stream_pdu_base(MTU)
+{
+
+    if( (type == "TCP_SERVER") || (type == "TCP_CLIENT")){
+        boost::asio::ip::tcp::resolver resolver(_io_service);
+        boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), addr, port);
+        _tcp_endpoint = *resolver.resolve(query);
+    }
+    if( (type == "UDP_SERVER") || (type == "UDP_CLIENT")){
+        boost::asio::ip::udp::resolver resolver(_io_service);
+        boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
+        if( (type == "UDP_SERVER") ){
+            _udp_endpoint = *resolver.resolve(query);
+        } else {
+            _udp_endpoint_other = *resolver.resolve(query);
+        }
+    }
+
+    // register ports
+    message_port_register_out(pmt::mp("pdus"));
+    message_port_register_in(pmt::mp("pdus"));
+
+    // set up socketry
+    if (type == "TCP_SERVER"){
+        _acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(_io_service, _tcp_endpoint));
+        _acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+        start_tcp_accept();
+        // bind tcp server send handler
+        set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::tcp_server_send, this, _1));
+    } else if(type =="TCP_CLIENT"){
+        boost::system::error_code error = boost::asio::error::host_not_found;
+        _tcp_socket.reset(new boost::asio::ip::tcp::socket(_io_service));
+        _tcp_socket->connect(_tcp_endpoint, error);
+        if(error){
+            throw boost::system::system_error(error);
+        }
+        set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::tcp_client_send, this, _1));
+        _tcp_socket->async_read_some(
+            boost::asio::buffer(rxbuf),
+            boost::bind(&gr_socket_pdu::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
+
+    } else if(type =="UDP_SERVER"){
+        _udp_socket.reset(new boost::asio::ip::udp::socket(_io_service, _udp_endpoint));
+        _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other, 
+            boost::bind(&gr_socket_pdu::handle_udp_read, this,
+                boost::asio::placeholders::error,
+                boost::asio::placeholders::bytes_transferred)); 
+        set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::udp_send, this, _1));
+    } else if(type =="UDP_CLIENT"){
+        _udp_socket.reset(new boost::asio::ip::udp::socket(_io_service, _udp_endpoint));
+        _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other, 
+            boost::bind(&gr_socket_pdu::handle_udp_read, this,
+                boost::asio::placeholders::error,
+                boost::asio::placeholders::bytes_transferred)); 
+        set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_socket_pdu::udp_send, this, _1));
+    } else {
+        throw std::runtime_error("unknown socket type!");
+    }
+    
+    // start thread for io_service
+    d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_socket_pdu::run_io_service, this)));
+    d_started = true;
+}
+
+void tcp_connection::handle_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred)
+  {
+    if(!error)
+    {
+        pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&buf[0]);
+        pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
+
+        d_block->message_port_pub( pmt::mp("pdus"), pdu );
+
+        socket_.async_read_some(
+            boost::asio::buffer(buf),
+            boost::bind(&tcp_connection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
+
+    } else {
+        std::cout << "error occurred\n";
+    }
+
+  }
+
+
+void gr_socket_pdu::tcp_server_send(pmt::pmt_t msg){
+    pmt::pmt_t vector = pmt::pmt_cdr(msg);
+    for(size_t i=0; i<d_tcp_connections.size(); i++){
+        d_tcp_connections[i]->send(vector);
+    }
+}
+
+void gr_socket_pdu::tcp_client_send(pmt::pmt_t msg){
+    pmt::pmt_t vector = pmt::pmt_cdr(msg);
+    size_t len = pmt::pmt_length(vector);
+    size_t offset(0);
+    boost::array<char, 10000> txbuf;
+    memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len);
+    _tcp_socket->send(boost::asio::buffer(txbuf,len));
+}
+
+void gr_socket_pdu::udp_send(pmt::pmt_t msg){
+    pmt::pmt_t vector = pmt::pmt_cdr(msg);
+    size_t len = pmt::pmt_length(vector);
+    size_t offset(0);
+    boost::array<char, 10000> txbuf;
+    memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len);
+    if(_udp_endpoint_other.address().to_string() != "0.0.0.0")
+        _udp_socket->send_to(boost::asio::buffer(txbuf,len), _udp_endpoint_other);
+}
diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.h b/gnuradio-core/src/lib/io/gr_socket_pdu.h
new file mode 100644
index 0000000000..3a96a3f97f
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_socket_pdu.h
@@ -0,0 +1,203 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_SOCKET_PDU_H
+#define INCLUDED_GR_SOCKET_PDU_H
+
+#include <gr_core_api.h>
+#include <gr_sync_block.h>
+#include <gr_message.h>
+#include <gr_msg_queue.h>
+#include <gr_stream_pdu_base.h>
+#include <boost/asio.hpp>
+
+#include <linux/if_tun.h>
+
+class gr_socket_pdu;
+typedef boost::shared_ptr<gr_socket_pdu> gr_socket_pdu_sptr;
+
+GR_CORE_API gr_socket_pdu_sptr gr_make_socket_pdu (std::string type, std::string addr, std::string port, int MTU=10000);
+
+class tcp_connection
+  : public boost::enable_shared_from_this<tcp_connection>
+{
+public:
+  typedef boost::shared_ptr<tcp_connection> pointer;
+  gr_socket_pdu *d_block;
+  boost::array<char, 10000> buf;
+
+  static pointer create(boost::asio::io_service& io_service)
+  {
+    return pointer(new tcp_connection(io_service));
+  }
+
+  boost::asio::ip::tcp::socket& socket()
+  {
+    return socket_;
+  }
+
+  void start(gr_socket_pdu* parent)
+  {
+    d_block = parent;
+//    message_ = "connected to gr_socket_pdu\n";
+//    boost::asio::async_write(socket_, boost::asio::buffer(message_),
+//        boost::bind(&tcp_connection::handle_write, shared_from_this(),
+//          boost::asio::placeholders::error,
+//          boost::asio::placeholders::bytes_transferred));
+
+    socket_.async_read_some(
+        boost::asio::buffer(buf),
+         boost::bind(&tcp_connection::handle_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
+  }
+  void send(pmt::pmt_t vector){
+    size_t len = pmt::pmt_length(vector);
+    size_t offset(0);
+    boost::array<char, 10000> txbuf;
+    memcpy(&txbuf[0], pmt::pmt_uniform_vector_elements(vector, offset), len);
+    boost::asio::async_write(socket_, boost::asio::buffer(txbuf, len),
+        boost::bind(&tcp_connection::handle_write, shared_from_this(),
+          boost::asio::placeholders::error,
+          boost::asio::placeholders::bytes_transferred));
+    }
+
+  ~tcp_connection(){
+//    std::cout << "tcp_connection destroyed\n";
+    }
+
+private:
+  tcp_connection(boost::asio::io_service& io_service)
+    : socket_(io_service)
+  {
+  }
+
+  void handle_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred);
+
+  void handle_write(const boost::system::error_code& /*error*/,
+      size_t /*bytes_transferred*/)
+  {
+  }
+
+  boost::asio::ip::tcp::socket socket_;
+  std::string message_;
+};
+
+
+/*!
+ * \brief Gather received items into messages and insert into msgq
+ * \ingroup sink_blk
+ */
+class GR_CORE_API gr_socket_pdu : public gr_stream_pdu_base
+{
+ private:
+  friend GR_CORE_API gr_socket_pdu_sptr
+  gr_make_socket_pdu(std::string type, std::string addr, std::string port, int MTU);
+
+  boost::asio::io_service _io_service;
+ 
+  boost::array<char, 10000> rxbuf;
+
+  // tcp specific
+  boost::asio::ip::tcp::endpoint _tcp_endpoint;
+
+  // specific to tcp server
+  boost::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor_tcp;
+  std::vector<tcp_connection::pointer> d_tcp_connections;
+  void tcp_server_send(pmt::pmt_t msg);
+  void tcp_client_send(pmt::pmt_t msg);
+  void udp_send(pmt::pmt_t msg);
+
+  // specific to tcp client
+  boost::shared_ptr<boost::asio::ip::tcp::socket> _tcp_socket;
+
+  // specific to udp client/server
+  boost::asio::ip::udp::endpoint _udp_endpoint;
+  boost::asio::ip::udp::endpoint _udp_endpoint_other;
+  boost::shared_ptr<boost::asio::ip::udp::socket> _udp_socket;
+
+  void handle_receive(const boost::system::error_code& error, std::size_t ){
+    }
+ 
+  void start_tcp_accept(){
+    tcp_connection::pointer new_connection =
+        tcp_connection::create(_acceptor_tcp->get_io_service());
+
+    _acceptor_tcp->async_accept(new_connection->socket(),
+        boost::bind(&gr_socket_pdu::handle_tcp_accept, this, new_connection,
+            boost::asio::placeholders::error));
+    }
+
+  void handle_tcp_accept(tcp_connection::pointer new_connection, const boost::system::error_code& error){
+        if (!error)
+        {
+            new_connection->start(this);
+            d_tcp_connections.push_back(new_connection);
+            start_tcp_accept();
+        } else {
+            std::cout << error << std::endl;
+        }
+    }
+ 
+  void run_io_service(){
+    _io_service.run();
+    } 
+
+  void handle_udp_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred){
+    if(!error){
+        pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&rxbuf[0]);
+        pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
+        
+        message_port_pub( pmt::mp("pdus"), pdu );
+    
+        _udp_socket->async_receive_from( boost::asio::buffer(rxbuf), _udp_endpoint_other,
+            boost::bind(&gr_socket_pdu::handle_udp_read, this,
+                boost::asio::placeholders::error,
+                boost::asio::placeholders::bytes_transferred));
+    } else {
+        throw boost::system::system_error(error);
+//        std::cout << "error occurred\n";
+    }
+  }
+  void handle_tcp_read(const boost::system::error_code& error/*error*/, size_t bytes_transferred){
+    if(!error)
+    {
+        pmt::pmt_t vector = pmt::pmt_init_u8vector(bytes_transferred, (const uint8_t*)&rxbuf[0]);
+        pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
+
+        message_port_pub( pmt::mp("pdus"), pdu );
+
+        _tcp_socket->async_read_some(
+            boost::asio::buffer(rxbuf),
+            boost::bind(&gr_socket_pdu::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
+
+    } else {
+        //std::cout << "error occurred\n";
+        throw boost::system::system_error(error);
+    }
+  }
+
+ protected:
+  gr_socket_pdu (std::string type, std::string addr, std::string port, int MTU=10000);
+ public:
+  ~gr_socket_pdu () {}
+};
+
+#endif /* INCLUDED_GR_TUNTAP_PDU_H */
diff --git a/gnuradio-core/src/lib/io/gr_socket_pdu.i b/gnuradio-core/src/lib/io/gr_socket_pdu.i
new file mode 100644
index 0000000000..3e20b63e20
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_socket_pdu.i
@@ -0,0 +1,33 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+GR_SWIG_BLOCK_MAGIC(gr,socket_pdu);
+
+%ignore tcp_connection;
+
+%{
+#include <gr_socket_pdu.h>
+%}
+
+%include "gr_stream_pdu_base.h"
+%include "gr_socket_pdu.h"
+
diff --git a/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc b/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc
new file mode 100644
index 0000000000..cff7296cba
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_stream_pdu_base.cc
@@ -0,0 +1,117 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_stream_pdu_base.h>
+#include <gr_io_signature.h>
+#include <cstdio>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <string.h>
+#include <iostream>
+#include <gr_pdu.h>
+#include <boost/format.hpp>
+
+static const long timeout_us = 100*1000; //100ms
+
+gr_stream_pdu_base::gr_stream_pdu_base (int MTU)
+  : gr_sync_block("stream_pdu_base",
+		  gr_make_io_signature(0, 0, 0),
+		  gr_make_io_signature(0, 0, 0)),
+    d_finished(false), d_started(false), d_fd(-1)
+{
+    // reserve space for rx buffer 
+    d_rxbuf.resize(MTU,0);
+}
+
+gr_stream_pdu_base::~gr_stream_pdu_base()
+{
+    stop_rxthread();
+}
+
+void gr_stream_pdu_base::stop_rxthread(){
+    d_finished = true;
+    if(d_started){
+        d_thread->interrupt();
+        d_thread->join();
+        }
+    }
+
+void gr_stream_pdu_base::start_rxthread(pmt::pmt_t _rxport){
+    rxport = _rxport;
+    d_thread = boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&gr_stream_pdu_base::run, this)));
+    d_started = true;
+    }
+
+void gr_stream_pdu_base::run(){
+    while(!d_finished) {
+        if(not wait_ready()){ continue; }   
+        const int result = read( d_fd, &d_rxbuf[0], d_rxbuf.size() );
+        if(result <= 0){ throw std::runtime_error("gr_stream_pdu_base, bad socket read!"); }
+        pmt::pmt_t vector = pmt::pmt_init_u8vector(result, &d_rxbuf[0]);       
+        pmt::pmt_t pdu = pmt::pmt_cons( pmt::PMT_NIL, vector);
+        message_port_pub(rxport, pdu);
+    } 
+}
+
+void gr_stream_pdu_base::send(pmt::pmt_t msg){
+    pmt::pmt_t vector = pmt::pmt_cdr(msg);
+    size_t offset(0);
+    size_t itemsize(gr_pdu_itemsize(type_from_pmt(vector)));
+    int len( pmt::pmt_length(vector)*itemsize );
+    
+    const int rv = write(d_fd, pmt::pmt_uniform_vector_elements(vector, offset), len);
+    if(rv != len){
+        std::cerr << boost::format("WARNING: gr_stream_pdu_base::send(pdu) write failed! (d_fd=%d, len=%d, rv=%d)")
+                            % d_fd % len % rv << std::endl;
+        }
+}
+
+int
+gr_stream_pdu_base::work(int noutput_items,
+		      gr_vector_const_void_star &input_items,
+		      gr_vector_void_star &output_items)
+{
+  throw std::runtime_error("should not be called.\n");
+  return 0; 
+}
+
+bool gr_stream_pdu_base::wait_ready(){
+    //setup timeval for timeout
+    timeval tv;
+    tv.tv_sec = 0;
+    tv.tv_usec = timeout_us;
+    
+    //setup rset for timeout
+    fd_set rset;
+    FD_ZERO(&rset);
+    FD_SET(d_fd, &rset);
+
+    //call select with timeout on receive socket
+    return ::select(d_fd+1, &rset, NULL, NULL, &tv) > 0;
+}
diff --git a/gnuradio-core/src/lib/io/gr_stream_pdu_base.h b/gnuradio-core/src/lib/io/gr_stream_pdu_base.h
new file mode 100644
index 0000000000..dc5dc5c2e9
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_stream_pdu_base.h
@@ -0,0 +1,62 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_STREAM_PDU_BASE_H
+#define INCLUDED_GR_STREAM_PDU_BASE_H
+
+#include <gr_core_api.h>
+#include <gr_sync_block.h>
+#include <gr_message.h>
+#include <gr_msg_queue.h>
+
+#include <linux/if_tun.h>
+
+
+/*!
+ * \brief Gather received items into messages and insert into msgq
+ * \ingroup sink_blk
+ */
+class GR_CORE_API gr_stream_pdu_base : public gr_sync_block
+{
+ public:
+  boost::shared_ptr<boost::thread> d_thread;
+  bool d_finished;
+  bool d_started;
+  std::vector<uint8_t> d_rxbuf;
+  void run();
+  int d_fd;
+  gr_stream_pdu_base (int MTU=10000);
+  ~gr_stream_pdu_base ();
+  void send(pmt::pmt_t msg);
+  bool wait_ready();
+  int work (int noutput_items,
+	    gr_vector_const_void_star &input_items,
+	    gr_vector_void_star &output_items);
+  void start_rxthread(pmt::pmt_t _rxport);
+  void stop_rxthread();
+ private:
+  pmt::pmt_t rxport;
+};
+
+typedef boost::shared_ptr<gr_stream_pdu_base> gr_stream_pdu_base_sptr;
+
+#endif /* INCLUDED_GR_TUNTAP_PDU_H */
diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc b/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc
new file mode 100644
index 0000000000..44de1a5f7d
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.cc
@@ -0,0 +1,143 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gr_tuntap_pdu.h>
+#include <gr_io_signature.h>
+#include <cstdio>
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <stdexcept>
+#include <string.h>
+#include <iostream>
+#include <gr_pdu.h>
+#include <boost/format.hpp>
+
+#if (defined(linux) || defined(__linux) || defined(__linux__))
+
+#include <sys/ioctl.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <arpa/inet.h>
+#include <linux/if.h>
+
+
+// public constructor that returns a shared_ptr
+
+gr_tuntap_pdu_sptr
+gr_make_tuntap_pdu (std::string dev, int MTU)
+{
+  return gnuradio::get_initial_sptr(new gr_tuntap_pdu(dev, MTU));
+}
+
+gr_tuntap_pdu::gr_tuntap_pdu (std::string dev, int MTU)
+    : gr_stream_pdu_base(MTU)
+{
+
+    // make the tuntap
+    char dev_cstr[1024];
+    memset(dev_cstr, 0x00, 1024);
+    strncpy(dev_cstr, dev.c_str(), std::min(sizeof(dev_cstr), dev.size()));
+    d_fd = tun_alloc(dev_cstr);
+    if(d_fd <= 0){
+        throw std::runtime_error("TunTap make: tun_alloc failed (are you running as root?)");
+        }
+
+    std::cout << boost::format(
+        "Allocated virtual ethernet interface: %s\n"
+        "You must now use ifconfig to set its IP address. E.g.,\n"
+        "  $ sudo ifconfig %s 192.168.200.1\n"
+        "Be sure to use a different address in the same subnet for each machine.\n"
+        ) % dev % dev << std::endl;
+
+    // set up output message port
+    message_port_register_out(pmt::mp("pdus"));
+    start_rxthread(pmt::mp("pdus"));
+    
+    // set up input message port
+    message_port_register_in(pmt::mp("pdus"));
+    set_msg_handler(pmt::mp("pdus"), boost::bind(&gr_tuntap_pdu::send, this, _1));
+}
+
+
+int gr_tuntap_pdu::tun_alloc(char *dev, int flags) {
+  struct ifreq ifr;
+  int fd, err;
+  const char *clonedev = "/dev/net/tun";
+
+  /* Arguments taken by the function:
+   *
+   * char *dev: the name of an interface (or '\0'). MUST have enough
+   *   space to hold the interface name if '\0' is passed
+   * int flags: interface flags (eg, IFF_TUN etc.)
+   */
+
+   /* open the clone device */
+   if( (fd = open(clonedev, O_RDWR)) < 0 ) {
+     return fd;
+   }
+
+   /* preparation of the struct ifr, of type "struct ifreq" */
+   memset(&ifr, 0, sizeof(ifr));
+
+   ifr.ifr_flags = flags;   /* IFF_TUN or IFF_TAP, plus maybe IFF_NO_PI */
+
+   if (*dev) {
+     /* if a device name was specified, put it in the structure; otherwise,
+      * the kernel will try to allocate the "next" device of the
+      * specified type */
+     strncpy(ifr.ifr_name, dev, IFNAMSIZ);
+   }
+
+   /* try to create the device */
+   if( (err = ioctl(fd, TUNSETIFF, (void *) &ifr)) < 0 ) {
+     close(fd);
+     return err;
+   }
+
+  /* if the operation was successful, write back the name of the
+   * interface to the variable "dev", so the caller can know
+   * it. Note that the caller MUST reserve space in *dev (see calling
+   * code below) */
+  strcpy(dev, ifr.ifr_name);
+
+  /* this is the special file descriptor that the caller will use to talk
+   * with the virtual interface */
+  return fd;
+}
+
+#else //if not linux
+
+boost::shared_ptr<gr_block> gr_make_tuntap_pdu (std::string dev, int MTU){
+    boost::shared_ptr<gr_block> rv;
+    throw std::runtime_error("tuntap only implemented on linux");
+    return rv;
+}
+
+#endif
diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.h b/gnuradio-core/src/lib/io/gr_tuntap_pdu.h
new file mode 100644
index 0000000000..0e8071c30d
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.h
@@ -0,0 +1,66 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2012 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_GR_TUNTAP_PDU_H
+#define INCLUDED_GR_TUNTAP_PDU_H
+
+#include <gr_core_api.h>
+#include <gr_sync_block.h>
+#include <gr_message.h>
+#include <gr_msg_queue.h>
+#include <gr_stream_pdu_base.h>
+
+#if (defined(linux) || defined(__linux) || defined(__linux__))
+
+#include <linux/if_tun.h>
+
+class gr_tuntap_pdu;
+typedef boost::shared_ptr<gr_tuntap_pdu> gr_tuntap_pdu_sptr;
+
+GR_CORE_API gr_tuntap_pdu_sptr gr_make_tuntap_pdu (std::string dev, int MTU=10000);
+
+/*!
+ * \brief Gather received items into messages and insert into msgq
+ * \ingroup sink_blk
+ */
+class GR_CORE_API gr_tuntap_pdu : public gr_stream_pdu_base
+{
+ private:
+  friend GR_CORE_API gr_tuntap_pdu_sptr
+  gr_make_tuntap_pdu(std::string dev, int MTU);
+  int tun_alloc(char* dev, int flags = IFF_TAP | IFF_NO_PI);
+  std::string d_dev;
+ protected:
+  gr_tuntap_pdu (std::string dev, int MTU=10000);
+
+ public:
+  ~gr_tuntap_pdu () {}
+
+};
+
+#else // if not linux
+
+GR_CORE_API boost::shared_ptr<gr_block> gr_make_tuntap_pdu (std::string dev, int MTU=0);
+
+#endif
+
+#endif /* INCLUDED_GR_TUNTAP_PDU_H */
diff --git a/gnuradio-core/src/lib/io/gr_tuntap_pdu.i b/gnuradio-core/src/lib/io/gr_tuntap_pdu.i
new file mode 100644
index 0000000000..589bbc3853
--- /dev/null
+++ b/gnuradio-core/src/lib/io/gr_tuntap_pdu.i
@@ -0,0 +1,30 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2005 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING.  If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+GR_SWIG_BLOCK_MAGIC(gr,tuntap_pdu);
+
+%{
+#include <gr_tuntap_pdu.h>
+%}
+
+%include "gr_tuntap_pdu.h"
+
diff --git a/gnuradio-core/src/lib/io/io.i b/gnuradio-core/src/lib/io/io.i
index 871ce1356e..e2de4eb976 100644
--- a/gnuradio-core/src/lib/io/io.i
+++ b/gnuradio-core/src/lib/io/io.i
@@ -49,6 +49,8 @@
 #include <gr_tagged_stream_to_pdu.h>
 #include <gr_message_debug.h>
 #include <gr_pdu.h>
+#include <gr_tuntap_pdu.h>
+#include <gr_socket_pdu.h>
 %}
 
 %include "gr_file_sink_base.i"
@@ -75,4 +77,7 @@
 %include "gr_tagged_stream_to_pdu.i"
 %include "gr_message_debug.i"
 %include "gr_pdu.i"
+%include "gr_tuntap_pdu.i"
+%include "gr_socket_pdu.i"
+
 
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.cc b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
index 0f7875a122..6ff57a1d6c 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.cc
@@ -38,7 +38,7 @@ static long s_ncurrently_allocated = 0;
 long
 gr_basic_block_ncurrently_allocated()
 {
-    return s_ncurrently_allocated;
+  return s_ncurrently_allocated;
 }
 
 gr_basic_block::gr_basic_block(const std::string &name,
@@ -53,25 +53,25 @@ gr_basic_block::gr_basic_block(const std::string &name,
     d_color(WHITE),
     message_subscribers(pmt::pmt_make_dict())
 {
-    s_ncurrently_allocated++;
+  s_ncurrently_allocated++;
 }
 
 gr_basic_block::~gr_basic_block()
 {
-    s_ncurrently_allocated--;
-    global_block_registry.block_unregister(this);
+  s_ncurrently_allocated--;
+  global_block_registry.block_unregister(this);
 }
 
 gr_basic_block_sptr
 gr_basic_block::to_basic_block()
 {
-    return shared_from_this();
+  return shared_from_this();
 }
 
 void
 gr_basic_block::set_block_alias(std::string name)
 { 
-    global_block_registry.register_symbolic_name(this, name); 
+  global_block_registry.register_symbolic_name(this, name); 
 }
 
 // ** Message passing interface **
@@ -147,28 +147,29 @@ void gr_basic_block::message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg)
 }
 
 //  - subscribe to a message port
-void gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target)
-{
-  if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+void
+gr_basic_block::message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){
+  if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ 
     std::stringstream ss;
-    ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id)
-       << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+    ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
     throw std::runtime_error(ss.str());
   }
-  
   pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
-  message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
+  
+  // ignore re-adds of the same target
+  if(!pmt::pmt_list_has(currlist, target))
+    message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_add(currlist,target));
 }
 
-void gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target)
-{
-  if(!pmt::pmt_dict_has_key(message_subscribers, port_id)) {
+void
+gr_basic_block::message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){
+  if(!pmt::pmt_dict_has_key(message_subscribers, port_id)){ 
     std::stringstream ss;
-    ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id)
-       << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
+    ss << "Port does not exist: \"" << pmt::pmt_write_string(port_id) << "\" on block: " << pmt::pmt_write_string(target) << std::endl;
     throw std::runtime_error(ss.str());
   }
-
+  
+  // ignore unsubs of unknown targets
   pmt::pmt_t currlist = pmt::pmt_dict_ref(message_subscribers,port_id,pmt::PMT_NIL);
   message_subscribers = pmt::pmt_dict_add(message_subscribers,port_id,pmt::pmt_list_rm(currlist,target));
 }
@@ -224,5 +225,3 @@ gr_basic_block::delete_head_blocking(pmt::pmt_t which_port)
   msg_queue[which_port].pop_front();
   return m;
 }
-
-
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 00e9c2192f..f3b7b835b4 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -36,6 +36,7 @@
 #include <gruel/thread.h>
 #include <boost/foreach.hpp>
 #include <boost/thread/condition_variable.hpp>
+#include <iostream>
 
 /*!
  * \brief The abstract base class for all signal processing blocks.
@@ -50,202 +51,215 @@
 
 class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_shared_from_this<gr_basic_block>
 {
-    typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
-
-private:
-    /*
-     * This function is called by the runtime system to dispatch messages.
-     *
-     * The thread-safety guarantees mentioned in set_msg_handler are implemented
-     * by the callers of this method.
-     */
-    void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
-    {
-        // AA Update this
-      if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
-        d_msg_handlers[which_port](msg); // Yes, invoke it.
-    };
-
-    //msg_handler_t	 d_msg_handler;
-    typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
-    d_msg_handlers_t d_msg_handlers;
-   
-    typedef std::deque<pmt::pmt_t>    msg_queue_t;
-    typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>    msg_queue_map_t;
-    typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
-    msg_queue_map_t msg_queue;
-//    boost::condition_variable msg_queue_ready;
-    std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
-
-    gruel::mutex          mutex;          //< protects all vars
-
-
-protected:
-    friend class gr_flowgraph;
-    friend class gr_flat_flowgraph; // TODO: will be redundant
-    friend class gr_tpb_thread_body;
-
-    enum vcolor { WHITE, GREY, BLACK };
-
-    std::string          d_name;
-    gr_io_signature_sptr d_input_signature;
-    gr_io_signature_sptr d_output_signature;
-    long                 d_unique_id;
-    long                 d_symbolic_id;
-    std::string          d_symbol_name;
-    std::string          d_symbol_alias;
-    vcolor               d_color;
-
-    gr_basic_block(void){} //allows pure virtual interface sub-classes
-
-    //! Protected constructor prevents instantiation by non-derived classes
-    gr_basic_block(const std::string &name,
-                   gr_io_signature_sptr input_signature,
-                   gr_io_signature_sptr output_signature);
-
-    //! may only be called during constructor
-    void set_input_signature(gr_io_signature_sptr iosig) {
-        d_input_signature = iosig;
+  typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
+  
+ private:
+  /*
+   * This function is called by the runtime system to dispatch messages.
+   *
+   * The thread-safety guarantees mentioned in set_msg_handler are implemented
+   * by the callers of this method.
+   */
+  void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
+  {
+    // AA Update this
+    if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+      d_msg_handlers[which_port](msg); // Yes, invoke it.
+  };
+  
+  //msg_handler_t	 d_msg_handler;
+  typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
+  d_msg_handlers_t d_msg_handlers;
+  
+  typedef std::deque<pmt::pmt_t>    msg_queue_t;
+  typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>    msg_queue_map_t;
+  typedef std::map<pmt::pmt_t, msg_queue_t, pmt::pmt_comperator>::iterator msg_queue_map_itr;
+  std::map<pmt::pmt_t, boost::shared_ptr<boost::condition_variable>, pmt::pmt_comperator> msg_queue_ready;
+  
+  gruel::mutex          mutex;          //< protects all vars
+  
+ protected:
+  friend class gr_flowgraph;
+  friend class gr_flat_flowgraph; // TODO: will be redundant
+  friend class gr_tpb_thread_body;
+  
+  enum vcolor { WHITE, GREY, BLACK };
+  
+  std::string          d_name;
+  gr_io_signature_sptr d_input_signature;
+  gr_io_signature_sptr d_output_signature;
+  long                 d_unique_id;
+  long                 d_symbolic_id;
+  std::string          d_symbol_name;
+  std::string          d_symbol_alias;
+  vcolor               d_color;
+  msg_queue_map_t msg_queue;
+  
+  gr_basic_block(void){} //allows pure virtual interface sub-classes
+  
+  //! Protected constructor prevents instantiation by non-derived classes
+  gr_basic_block(const std::string &name,
+		 gr_io_signature_sptr input_signature,
+		 gr_io_signature_sptr output_signature);
+  
+  //! may only be called during constructor
+  void set_input_signature(gr_io_signature_sptr iosig) {
+    d_input_signature = iosig;
+  }
+  
+  //! may only be called during constructor
+  void set_output_signature(gr_io_signature_sptr iosig) {
+    d_output_signature = iosig;
+  }
+  
+  /*!
+   * \brief Allow the flowgraph to set for sorting and partitioning
+   */
+  void set_color(vcolor color) { d_color = color; }
+  vcolor color() const { return d_color; }
+  
+  // Message passing interface
+  pmt::pmt_t message_subscribers;
+  
+ public:
+  virtual ~gr_basic_block();
+  long unique_id() const { return d_unique_id; }
+  long symbolic_id() const { return d_symbolic_id; }
+  std::string name() const { return d_name; }
+  std::string symbol_name() const { return d_symbol_name; }
+  gr_io_signature_sptr input_signature() const  { return d_input_signature; }
+  gr_io_signature_sptr output_signature() const { return d_output_signature; }
+  gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
+  bool alias_set() { return !d_symbol_alias.empty(); }
+  std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
+  pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
+  void set_block_alias(std::string name);
+  
+  // ** Message passing interface **
+  void message_port_register_in(pmt::pmt_t port_id);
+  void message_port_register_out(pmt::pmt_t port_id);
+  void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
+  void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
+  void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
+  
+  virtual bool message_port_is_hier(pmt::pmt_t port_id) { std::cout << "is_hier\n"; return false; }
+  virtual bool message_port_is_hier_in(pmt::pmt_t port_id) { std::cout << "is_hier_in\n"; return false; }
+  virtual bool message_port_is_hier_out(pmt::pmt_t port_id) { std::cout << "is_hier_out\n"; return false; }
+  
+  /*!
+   * \brief Get input message port names.
+   *
+   * Returns the available input message ports for a block. The
+   * return object is a PMT vector that is filled with PMT symbols.
+   */
+  pmt::pmt_t message_ports_in();
+  
+  /*!
+   * \brief Get output message port names.
+   *
+   * Returns the available output message ports for a block. The
+   * return object is a PMT vector that is filled with PMT symbols.
+   */
+  pmt::pmt_t message_ports_out();
+  
+  /*!
+   * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
+   */
+  void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
+  
+  //! is the queue empty?
+  //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
+  bool empty_p(pmt::pmt_t which_port) { 
+    if(msg_queue.find(which_port) == msg_queue.end())
+      throw std::runtime_error("port does not exist!");
+    return msg_queue[which_port].empty(); 
+  }
+  bool empty_p() { 
+    bool rv = true;
+    BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
+    return rv;
+  }
+  
+  //| Acquires and release the mutex
+  void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
+  /*!
+   * \returns returns pmt at head of queue or pmt_t() if empty.
+   */
+  pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
+  
+  /*!
+   * \returns returns pmt at head of queue or pmt_t() if empty.
+   */
+  pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
+  
+  msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
+    return msg_queue[which_port].begin();
+  }
+
+  void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
+    msg_queue[which_port].erase(it);
+  }
+  
+  virtual bool has_msg_port(pmt::pmt_t which_port){
+    if(msg_queue.find(which_port) != msg_queue.end()){
+      return true;
     }
-
-    //! may only be called during constructor
-    void set_output_signature(gr_io_signature_sptr iosig) {
-        d_output_signature = iosig;
-    }
-
-    /*!
-     * \brief Allow the flowgraph to set for sorting and partitioning
-     */
-    void set_color(vcolor color) { d_color = color; }
-    vcolor color() const { return d_color; }
-
-    // Message passing interface
-    pmt::pmt_t message_subscribers;
-
-public:
-    virtual ~gr_basic_block();
-    long unique_id() const { return d_unique_id; }
-    long symbolic_id() const { return d_symbolic_id; }
-    std::string name() const { return d_name; }
-    std::string symbol_name() const { return d_symbol_name; }
-    gr_io_signature_sptr input_signature() const  { return d_input_signature; }
-    gr_io_signature_sptr output_signature() const { return d_output_signature; }
-    gr_basic_block_sptr to_basic_block(); // Needed for Python type coercion
-    bool alias_set() { return !d_symbol_alias.empty(); }
-    std::string alias(){ return alias_set()?d_symbol_alias:symbol_name(); }
-    pmt::pmt_t alias_pmt(){ return pmt::pmt_intern(alias()); }
-    void set_block_alias(std::string name);
-
-    // ** Message passing interface **
-    void message_port_register_in(pmt::pmt_t port_id);
-    void message_port_register_out(pmt::pmt_t port_id);
-    void message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg);
-    void message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target);
-    void message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target);
-
-    /*!
-     * \brief Get input message port names.
-     *
-     * Returns the available input message ports for a block. The
-     * return object is a PMT vector that is filled with PMT symbols.
-     */
-    pmt::pmt_t message_ports_in();
-
-    /*!
-     * \brief Get output message port names.
-     *
-     * Returns the available output message ports for a block. The
-     * return object is a PMT vector that is filled with PMT symbols.
-     */
-    pmt::pmt_t message_ports_out();
-
-    /*!
-     * Accept msg, place in queue, arrange for thread to be awakened if it's not already.
-     */
-    void _post(pmt::pmt_t which_port, pmt::pmt_t msg);
-
-    //! is the queue empty?
-    //bool empty_p(const pmt::pmt_t &which_port) const { return msg_queue[which_port].empty(); }
-    bool empty_p(pmt::pmt_t which_port) { 
-        if(msg_queue.find(which_port) == msg_queue.end())
-            throw std::runtime_error("port does not exist!");
-        return msg_queue[which_port].empty(); 
-        }
-    bool empty_p() { 
-        bool rv = true;
-        BOOST_FOREACH(msg_queue_map_t::value_type &i, msg_queue){ rv &= msg_queue[i.first].empty(); }
-        return rv;
-        }
-
-    //| Acquires and release the mutex
-    void insert_tail( pmt::pmt_t which_port, pmt::pmt_t msg);
-    /*!
-     * \returns returns pmt at head of queue or pmt_t() if empty.
-     */
-    pmt::pmt_t delete_head_nowait( pmt::pmt_t which_port);
-
-    /*!
-     * \returns returns pmt at head of queue or pmt_t() if empty.
-     */
-    pmt::pmt_t delete_head_blocking( pmt::pmt_t which_port);
-
-    msg_queue_t::iterator get_iterator(pmt::pmt_t which_port){
-        return msg_queue[which_port].begin();
-        }
-    void erase_msg(pmt::pmt_t which_port, msg_queue_t::iterator it){
-        msg_queue[which_port].erase(it);
-        }
-
-
-    /*!
-     * \brief Confirm that ninputs and noutputs is an acceptable combination.
-     *
-     * \param ninputs	number of input streams connected
-     * \param noutputs	number of output streams connected
-     *
-     * \returns true if this is a valid configuration for this block.
-     *
-     * This function is called by the runtime system whenever the
-     * topology changes.  Most classes do not need to override this.
-     * This check is in addition to the constraints specified by the input
-     * and output gr_io_signatures.
-     */
-    virtual bool check_topology(int ninputs, int noutputs) { return true; }
-
-    /*!
-     * \brief Set the callback that is fired when messages are available.
-     *
-     * \p msg_handler can be any kind of function pointer or function object
-     * that has the signature:
-     * <pre>
-     *    void msg_handler(pmt::pmt msg);
-     * </pre>
-     *
-     * (You may want to use boost::bind to massage your callable into the
-     * correct form.  See gr_nop.{h,cc} for an example that sets up a class
-     * method as the callback.)
-     *
-     * Blocks that desire to handle messages must call this method in their
-     * constructors to register the handler that will be invoked when messages
-     * are available.
-     *
-     * If the block inherits from gr_block, the runtime system will ensure that
-     * msg_handler is called in a thread-safe manner, such that work and
-     * msg_handler will never be called concurrently.  This allows msg_handler
-     * to update state variables without having to worry about thread-safety
-     * issues with work, general_work or another invocation of msg_handler.
-     *
-     * If the block inherits from gr_hier_block2, the runtime system will
-     * ensure that no reentrant calls are made to msg_handler.
-     */
-    //template <typename T> void set_msg_handler(T msg_handler){
-    //  d_msg_handler = msg_handler_t(msg_handler);
-    //}
-    template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
-      if(msg_queue.find(which_port) == msg_queue.end()){ 
-            throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
-      d_msg_handlers[which_port] = msg_handler_t(msg_handler);
+    if(pmt::pmt_dict_has_key(message_subscribers, which_port)){
+      return true;
     }
+    return false;
+  }
+  
+  
+  /*!
+   * \brief Confirm that ninputs and noutputs is an acceptable combination.
+   *
+   * \param ninputs	number of input streams connected
+   * \param noutputs	number of output streams connected
+   *
+   * \returns true if this is a valid configuration for this block.
+   *
+   * This function is called by the runtime system whenever the
+   * topology changes.  Most classes do not need to override this.
+   * This check is in addition to the constraints specified by the input
+   * and output gr_io_signatures.
+   */
+  virtual bool check_topology(int ninputs, int noutputs) { return true; }
+  
+  /*!
+   * \brief Set the callback that is fired when messages are available.
+   *
+   * \p msg_handler can be any kind of function pointer or function object
+   * that has the signature:
+   * <pre>
+   *    void msg_handler(pmt::pmt msg);
+   * </pre>
+   *
+   * (You may want to use boost::bind to massage your callable into the
+   * correct form.  See gr_nop.{h,cc} for an example that sets up a class
+   * method as the callback.)
+   *
+   * Blocks that desire to handle messages must call this method in their
+   * constructors to register the handler that will be invoked when messages
+   * are available.
+   *
+   * If the block inherits from gr_block, the runtime system will ensure that
+   * msg_handler is called in a thread-safe manner, such that work and
+   * msg_handler will never be called concurrently.  This allows msg_handler
+   * to update state variables without having to worry about thread-safety
+   * issues with work, general_work or another invocation of msg_handler.
+   *
+   * If the block inherits from gr_hier_block2, the runtime system will
+   * ensure that no reentrant calls are made to msg_handler.
+   */
+  //template <typename T> void set_msg_handler(T msg_handler){
+  //  d_msg_handler = msg_handler_t(msg_handler);
+  //}
+  template <typename T> void set_msg_handler(pmt::pmt_t which_port, T msg_handler){
+    if(msg_queue.find(which_port) == msg_queue.end()){ 
+      throw std::runtime_error("attempt to set_msg_handler() on bad input message port!"); }
+    d_msg_handlers[which_port] = msg_handler_t(msg_handler);
+  }
 };
 
 inline bool operator<(gr_basic_block_sptr lhs, gr_basic_block_sptr rhs)
@@ -260,8 +274,8 @@ GR_CORE_API long gr_basic_block_ncurrently_allocated();
 
 inline std::ostream &operator << (std::ostream &os, gr_basic_block_sptr basic_block)
 {
-    os << basic_block->name() << "(" << basic_block->unique_id() << ")";
-    return os;
+  os << basic_block->name() << "(" << basic_block->unique_id() << ")";
+  return os;
 }
 
 #endif /* INCLUDED_GR_BASIC_BLOCK_H */
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
index e04deb9485..c19863f347 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.cc
@@ -31,8 +31,9 @@
 #include <volk/volk.h>
 #include <iostream>
 #include <map>
+#include <boost/format.hpp>
 
-#define GR_FLAT_FLOWGRAPH_DEBUG 0
+#define GR_FLAT_FLOWGRAPH_DEBUG  0
 
 // 32Kbyte buffer size between blocks
 #define GR_FIXED_BUFFER_SIZE (32*(1L<<10))
@@ -71,6 +72,15 @@ gr_flat_flowgraph::setup_connections()
     block->set_is_unaligned(false);
   }
 
+  // Connect message ports connetions
+  for(gr_msg_edge_viter_t i = d_msg_edges.begin(); i != d_msg_edges.end(); i++){
+    if(GR_FLAT_FLOWGRAPH_DEBUG)
+        std::cout << boost::format("flat_fg connecting msg primitives: (%s, %s)->(%s, %s)\n") %
+                    i->src().block() % i->src().port() %
+                    i->dst().block() % i->dst().port();
+    i->src().block()->message_port_sub( i->src().port(), pmt::pmt_cons(i->dst().block()->alias_pmt(), i->dst().port()) );
+    }
+
 }
 
 gr_block_detail_sptr
@@ -350,3 +360,25 @@ gr_flat_flowgraph::make_block_vector(gr_basic_block_vector_t &blocks)
 
   return result;
 }
+
+
+void gr_flat_flowgraph::replace_endpoint(const gr_msg_endpoint &e, const gr_msg_endpoint &r, bool is_src){
+    size_t n_replr(0);
+    if(GR_FLAT_FLOWGRAPH_DEBUG)
+        std::cout << boost::format("gr_flat_flowgraph::replace_endpoint( %s, %s, %d )\n") % e.block()% r.block()% is_src;
+    for(size_t i=0; i<d_msg_edges.size(); i++){
+        if(is_src){
+            if(d_msg_edges[i].src() == e){
+                d_msg_edges[i] = gr_msg_edge(r, d_msg_edges[i].dst() );
+                n_replr++;
+            }
+        } else {
+            if(d_msg_edges[i].dst() == e){
+                d_msg_edges[i] = gr_msg_edge(d_msg_edges[i].src(), r );
+                n_replr++;
+            }
+        }
+    }
+//    std::cout << "n_repl = " << n_repl <<"\n";
+}
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
index 0926bcc8f3..52f2023347 100644
--- a/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flat_flowgraph.h
@@ -46,7 +46,7 @@ public:
 
   // Wire gr_blocks together in new flat_flowgraph
   void setup_connections();
-
+  
   // Merge applicable connections from existing flat flowgraph
   void merge_connections(gr_flat_flowgraph_sptr sfg);
 
@@ -57,6 +57,8 @@ public:
    */
   static gr_block_vector_t make_block_vector(gr_basic_block_vector_t &blocks);
 
+  void replace_endpoint(const gr_msg_endpoint &e, const gr_msg_endpoint &r, bool is_src);
+
 private:
   gr_flat_flowgraph();
 
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
index 69c304a3d8..63a2084802 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.cc
@@ -149,6 +149,16 @@ gr_flowgraph::check_valid_port(gr_io_signature_sptr sig, int port)
   }
 }
 
+void 
+gr_flowgraph::check_valid_port(const gr_msg_endpoint &e)
+{
+    if (GR_FLOWGRAPH_DEBUG)
+        std::cout << "check_valid_port( " << e.block() << ", " << e.port() << ")\n";
+
+    if(!e.block()->has_msg_port(e.port()))
+        throw std::invalid_argument("invalid msg port in connect() or disconnect()");
+}
+
 void
 gr_flowgraph::check_dst_not_used(const gr_endpoint &dst)
 {
@@ -181,8 +191,10 @@ gr_flowgraph::calc_used_blocks()
   gr_basic_block_vector_t tmp;
 
   // make sure free standing message blocks are included
-  for (gr_basic_block_vector_t::iterator it=d_msgblocks.begin(); it!=d_msgblocks.end(); it++){
-    tmp.push_back(*it);
+  for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+//  for now only blocks receiving messages get a thread context - uncomment to allow senders to also obtain one
+//    tmp.push_back(p->src().block());
+    tmp.push_back(p->dst().block());
   }
 
   // Collect all blocks in the edge list
@@ -477,7 +489,27 @@ gr_flowgraph::topological_dfs_visit(gr_basic_block_sptr block, gr_basic_block_ve
   output.push_back(block);
 }
 
-void gr_flowgraph::add_msg_block(gr_basic_block_sptr blk){
-    d_msgblocks.push_back(blk);
+void gr_flowgraph::connect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst){
+  check_valid_port(src);
+  check_valid_port(dst);
+  for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+    if(p->src() == src && p->dst() == dst){ 
+        throw std::runtime_error("connect called on already connected edge!");
+        }
+    }
+  d_msg_edges.push_back(gr_msg_edge(src,dst));
 }
 
+void gr_flowgraph::disconnect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst){
+  check_valid_port(src);
+  check_valid_port(dst);
+  for (gr_msg_edge_viter_t p = d_msg_edges.begin(); p != d_msg_edges.end(); p++) {
+    if(p->src() == src && p->dst() == dst){
+        d_msg_edges.erase(p);
+        return;
+        }
+  }
+  throw std::runtime_error("disconnect called on non-connected edge!");
+}
+
+
diff --git a/gnuradio-core/src/lib/runtime/gr_flowgraph.h b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
index 860cb0ff1e..bef70f626f 100644
--- a/gnuradio-core/src/lib/runtime/gr_flowgraph.h
+++ b/gnuradio-core/src/lib/runtime/gr_flowgraph.h
@@ -52,6 +52,31 @@ inline bool gr_endpoint::operator==(const gr_endpoint &other) const
 	  d_port == other.d_port);
 }
 
+class GR_CORE_API gr_msg_endpoint
+{
+private:
+  gr_basic_block_sptr d_basic_block;
+  pmt::pmt_t d_port;
+  bool d_is_hier;
+public:
+  gr_msg_endpoint() : d_basic_block(), d_port(pmt::PMT_NIL) { }
+  gr_msg_endpoint(gr_basic_block_sptr block, pmt::pmt_t port, bool is_hier=false){ d_basic_block = block; d_port = port; d_is_hier = is_hier;}
+  gr_basic_block_sptr block() const { return d_basic_block; }
+  pmt::pmt_t port() const { return d_port; }
+  bool is_hier() const { return d_is_hier; }
+  void set_hier(bool h) { d_is_hier = h; }
+
+  bool operator==(const gr_msg_endpoint &other) const;
+
+};
+
+inline bool gr_msg_endpoint::operator==(const gr_msg_endpoint &other) const
+{
+  return (d_basic_block == other.d_basic_block &&
+	  pmt::pmt_equal(d_port, other.d_port));
+}
+
+
 // Hold vectors of gr_endpoint objects
 typedef std::vector<gr_endpoint> gr_endpoint_vector_t;
 typedef std::vector<gr_endpoint>::iterator gr_endpoint_viter_t;
@@ -75,11 +100,35 @@ private:
   gr_endpoint d_dst;
 };
 
+
 // Hold vectors of gr_edge objects
 typedef std::vector<gr_edge> gr_edge_vector_t;
 typedef std::vector<gr_edge>::iterator gr_edge_viter_t;
 
 
+/*!
+ *\brief Class representing a msg connection between to graph msg endpoints
+ *
+ */
+class GR_CORE_API gr_msg_edge
+{
+public:
+  gr_msg_edge() : d_src(), d_dst() { };
+  gr_msg_edge(const gr_msg_endpoint &src, const gr_msg_endpoint &dst) : d_src(src), d_dst(dst) { }
+  ~gr_msg_edge() {}
+
+  const gr_msg_endpoint &src() const { return d_src; }
+  const gr_msg_endpoint &dst() const { return d_dst; }
+
+private:
+  gr_msg_endpoint d_src;
+  gr_msg_endpoint d_dst;
+};
+
+// Hold vectors of gr_edge objects
+typedef std::vector<gr_msg_edge> gr_msg_edge_vector_t;
+typedef std::vector<gr_msg_edge>::iterator gr_msg_edge_viter_t;
+
 // Create a shared pointer to a heap allocated flowgraph
 // (types defined in gr_runtime_types.h)
 GR_CORE_API gr_flowgraph_sptr gr_make_flowgraph();
@@ -110,7 +159,11 @@ public:
   void disconnect(gr_basic_block_sptr src_block, int src_port,
 		  gr_basic_block_sptr dst_block, int dst_port);
 
-  void add_msg_block(gr_basic_block_sptr blk);
+  // Connect two msg endpoints
+  void connect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst);
+  
+  // Disconnect two msg endpoints
+  void disconnect(const gr_msg_endpoint &src, const gr_msg_endpoint &dst);
 
   // Validate connectivity, raise exception if invalid
   void validate();
@@ -120,6 +173,9 @@ public:
 
   // Return vector of edges
   const gr_edge_vector_t &edges() const { return d_edges; }
+  
+  // Return vector of msg edges
+  const gr_msg_edge_vector_t &msg_edges() const { return d_msg_edges; }
 
   // Return vector of connected blocks
   gr_basic_block_vector_t calc_used_blocks();
@@ -130,11 +186,11 @@ public:
   // Return vector of vectors of disjointly connected blocks, topologically
   // sorted.
   std::vector<gr_basic_block_vector_t> partition();
-  gr_basic_block_vector_t d_msgblocks;
 
 protected:
   gr_basic_block_vector_t d_blocks;
   gr_edge_vector_t d_edges;
+  gr_msg_edge_vector_t d_msg_edges;
 
   gr_flowgraph();
   std::vector<int> calc_used_ports(gr_basic_block_sptr block, bool check_inputs);
@@ -146,6 +202,7 @@ protected:
 private:
 
   void check_valid_port(gr_io_signature_sptr sig, int port);
+  void check_valid_port(const gr_msg_endpoint &e);
   void check_dst_not_used(const gr_endpoint &dst);
   void check_type_match(const gr_endpoint &src, const gr_endpoint &dst);
   gr_edge_vector_t calc_connections(gr_basic_block_sptr block, bool check_inputs); // false=use outputs
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
index a19bfe1954..8c2794c63c 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.cc
@@ -44,7 +44,9 @@ gr_hier_block2::gr_hier_block2(const std::string &name,
                                gr_io_signature_sptr input_signature,
                                gr_io_signature_sptr output_signature)
   : gr_basic_block(name, input_signature, output_signature),
-    d_detail(new gr_hier_block2_detail(this))
+    d_detail(new gr_hier_block2_detail(this)),
+    hier_message_ports_in(pmt::PMT_NIL),
+    hier_message_ports_out(pmt::PMT_NIL)
 {
   // This bit of magic ensures that self() works in the constructors of derived classes.
   gnuradio::detail::sptr_magic::create_and_stash_initial_sptr(this);
@@ -141,6 +143,7 @@ gr_hier_block2::unlock()
   d_detail->unlock();
 }
 
+
 gr_flat_flowgraph_sptr
 gr_hier_block2::flatten() const
 {
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.h b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
index e8364a740b..f80dd73e4b 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.h
@@ -166,6 +166,39 @@ public:
   gr_flat_flowgraph_sptr flatten() const;
 
   gr_hier_block2_sptr to_hier_block2(); // Needed for Python type coercion
+
+  bool has_msg_port(pmt::pmt_t which_port){
+    return message_port_is_hier(which_port) || gr_basic_block::has_msg_port(which_port);
+    }   
+  
+  bool message_port_is_hier(pmt::pmt_t port_id){
+    return message_port_is_hier_in(port_id) || message_port_is_hier_out(port_id);
+    }
+  bool message_port_is_hier_in(pmt::pmt_t port_id){
+    return pmt::pmt_list_has(hier_message_ports_in, port_id);
+    }
+  bool message_port_is_hier_out(pmt::pmt_t port_id){
+    return pmt::pmt_list_has(hier_message_ports_out, port_id);
+    }
+
+  pmt::pmt_t hier_message_ports_in;
+  pmt::pmt_t hier_message_ports_out;
+
+  void message_port_register_hier_in(pmt::pmt_t port_id){
+    if(pmt::pmt_list_has(hier_message_ports_in, port_id))
+        throw std::invalid_argument("hier msg in port by this name already registered");
+    if(msg_queue.find(port_id) != msg_queue.end())
+        throw std::invalid_argument("block already has a primitive input port by this name");
+    hier_message_ports_in = pmt::pmt_list_add(hier_message_ports_in, port_id);
+    }
+  void message_port_register_hier_out(pmt::pmt_t port_id){
+    if(pmt::pmt_list_has(hier_message_ports_out, port_id))
+        throw std::invalid_argument("hier msg out port by this name already registered");
+    if(pmt::pmt_dict_has_key(message_subscribers, port_id))
+        throw std::invalid_argument("block already has a primitive output port by this name");
+    hier_message_ports_out = pmt::pmt_list_add(hier_message_ports_out, port_id);
+    }
+
 };
 
 inline gr_hier_block2_sptr cast_to_hier_block2_sptr(gr_basic_block_sptr block) {
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2.i b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
index 7c0e62f288..a857394ca7 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2.i
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2.i
@@ -40,6 +40,8 @@ gr_hier_block2_sptr gr_make_hier_block2(const std::string name,
 %rename(primitive_disconnect) gr_hier_block2::disconnect;
 %rename(primitive_msg_connect) gr_hier_block2::msg_connect;
 %rename(primitive_msg_disconnect) gr_hier_block2::msg_disconnect;
+%rename(primitive_message_port_register_hier_in) gr_hier_block2::message_port_register_hier_in;
+%rename(primitive_message_port_register_hier_out) gr_hier_block2::message_port_register_hier_out;
 
 class gr_hier_block2 : public gr_basic_block
 {
@@ -78,5 +80,9 @@ public:
   void lock();
   void unlock();
 
+  void message_port_register_hier_in(pmt::pmt_t port_id);
+  void message_port_register_hier_out(pmt::pmt_t port_id);
+
+
   gr_hier_block2_sptr to_hier_block2(); // Needed for Python type coercion
 };
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
index ff2a5db8cc..e70553ddc1 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.cc
@@ -27,6 +27,7 @@
 #include <gr_io_signature.h>
 #include <stdexcept>
 #include <sstream>
+#include <boost/format.hpp>
 
 #define GR_HIER_BLOCK2_DETAIL_DEBUG 0
 
@@ -53,6 +54,7 @@ gr_hier_block2_detail::gr_hier_block2_detail(gr_hier_block2 *owner) :
   d_outputs = gr_endpoint_vector_t(max_outputs);
 }
 
+
 gr_hier_block2_detail::~gr_hier_block2_detail()
 {
   d_owner = 0; // Don't use delete, we didn't allocate
@@ -151,15 +153,39 @@ gr_hier_block2_detail::msg_connect(gr_basic_block_sptr src, pmt::pmt_t srcport,
     std::cout << "connecting message port..." << std::endl;
     
   // register the subscription
-  src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+// this is done later...
+//  src->message_port_sub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
 
   // add block uniquely to list to internal blocks
   if (std::find(d_blocks.begin(), d_blocks.end(), dst) == d_blocks.end()){
+    d_blocks.push_back(src);
     d_blocks.push_back(dst);
     }
 
-  // make sure we instantiate a thread for this block
-  d_fg->add_msg_block(dst);
+  bool hier_out = (d_owner == src.get()) && src->message_port_is_hier_out(srcport);;
+  bool hier_in = (d_owner == dst.get()) && dst->message_port_is_hier_in(dstport);
+
+  gr_hier_block2_sptr src_block(cast_to_hier_block2_sptr(src));
+  gr_hier_block2_sptr dst_block(cast_to_hier_block2_sptr(dst));
+
+  if (src_block && src.get() != d_owner) {
+    if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+      std::cout << "connect: src is hierarchical, setting parent to " << this << std::endl;
+    src_block->d_detail->d_parent_detail = this;
+  }
+
+  if (dst_block && dst.get() != d_owner) {
+    if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+      std::cout << "connect: dst is hierarchical, setting parent to " << this << std::endl;
+    dst_block->d_detail->d_parent_detail = this;
+  }
+
+  // add edge for this message connection
+  if(GR_HIER_BLOCK2_DETAIL_DEBUG)
+        std::cout << boost::format("connect( (%s, %s, %d), (%s, %s, %d) )\n") %
+                        src % srcport % hier_out %
+                        dst % dstport % hier_in;
+  d_fg->connect( gr_msg_endpoint(src, srcport, hier_out), gr_msg_endpoint(dst, dstport, hier_in));
 }
 
 void
@@ -169,8 +195,13 @@ gr_hier_block2_detail::msg_disconnect(gr_basic_block_sptr src, pmt::pmt_t srcpor
   if (GR_HIER_BLOCK2_DETAIL_DEBUG)
     std::cout << "disconnecting message port..." << std::endl;
     
-  // register the subscription
+  // unregister the subscription - if already subscribed
   src->message_port_unsub(srcport, pmt::pmt_cons(dst->alias_pmt(), dstport));
+
+  // remove edge for this message connection
+  bool hier_out = (d_owner == src.get()) && src->message_port_is_hier_out(srcport);;
+  bool hier_in = (d_owner == dst.get()) && dst->message_port_is_hier_in(dstport);
+  d_fg->disconnect( gr_msg_endpoint(src, srcport, hier_out), gr_msg_endpoint(dst, dstport, hier_in));
 }
 
 void
@@ -435,11 +466,16 @@ void
 gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
 {
   if (GR_HIER_BLOCK2_DETAIL_DEBUG)
-    std::cout << "Flattening " << d_owner->name() << std::endl;
+    std::cout << " ** Flattening " << d_owner->name() << std::endl;
 
   // Add my edges to the flow graph, resolving references to actual endpoints
   gr_edge_vector_t edges = d_fg->edges();
+  gr_msg_edge_vector_t msg_edges = d_fg->msg_edges();
   gr_edge_viter_t p;
+  gr_msg_edge_viter_t q,u;
+
+  if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+    std::cout << "Flattening stream connections: " << std::endl;
 
   for (p = edges.begin(); p != edges.end(); p++) {
     if (GR_HIER_BLOCK2_DETAIL_DEBUG)
@@ -457,7 +493,46 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
       }
     }
   }
-  sfg->d_msgblocks = d_fg->d_msgblocks;
+
+  // loop through flattening hierarchical connections
+  if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+    std::cout << "Flattening msg connections: " << std::endl;
+
+  for(q = msg_edges.begin(); q != msg_edges.end(); q++) {
+    if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+      std::cout << boost::format(" flattening edge ( %s, %s, %d) -> ( %s, %s, %d)\n") % q->src().block() % q->src().port() % q->src().is_hier() % q->dst().block() % q->dst().port() % q->dst().is_hier();
+
+    bool normal_connection = true;
+
+    // resolve existing connections to hier ports
+    if(q->dst().is_hier()){
+        if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+          std::cout << boost::format("  resolve hier output (%s, %s)") % q->dst().block() % q->dst().port() << std::endl;
+        sfg->replace_endpoint( q->dst(), q->src(), true );
+        normal_connection = false;
+        }
+
+    if(q->src().is_hier()){
+        if (GR_HIER_BLOCK2_DETAIL_DEBUG)
+          std::cout << boost::format("  resolve hier input (%s, %s)") % q->src().block() % q->src().port() << std::endl;
+        sfg->replace_endpoint( q->src(), q->dst(), false );
+        normal_connection = false;
+        } 
+
+    // propogate non hier connections through
+    if(normal_connection){
+        sfg->connect( q->src(), q->dst() );
+        } 
+    }
+
+/*  // connect primitive edges in the new fg
+  for(q = msg_edges.begin(); q != msg_edges.end(); q++) {
+    if( (!q->src().is_hier()) && (!q->dst().is_hier()) ){
+        sfg->connect( q->src(), q->dst() );
+        } else {
+        std::cout << "not connecting hier connection!" << std::endl;
+        }
+    }*/
 
   // Construct unique list of blocks used either in edges, inputs,
   // outputs, or by themselves.  I still hate STL.
@@ -499,7 +574,7 @@ gr_hier_block2_detail::flatten_aux(gr_flat_flowgraph_sptr sfg) const
   // Recurse hierarchical children
   for (gr_basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
     gr_hier_block2_sptr hier_block2(cast_to_hier_block2_sptr(*p));
-    if (hier_block2) {
+    if (hier_block2 && (hier_block2.get() != d_owner)) {
       if (GR_HIER_BLOCK2_DETAIL_DEBUG)
 	std::cout << "flatten_aux: recursing into hierarchical block " << hier_block2 << std::endl;
       hier_block2->d_detail->flatten_aux(sfg);
@@ -530,3 +605,4 @@ gr_hier_block2_detail::unlock()
   else
     d_owner->unlock();
 }
+
diff --git a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
index f2d2b3c4e8..b38dae3016 100644
--- a/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
+++ b/gnuradio-core/src/lib/runtime/gr_hier_block2_detail.h
@@ -61,6 +61,7 @@ private:
   gr_endpoint_vector_t d_outputs;             // Single internal endpoint per external output
   gr_basic_block_vector_t d_blocks;
 
+
   void connect_input(int my_port, int port, gr_basic_block_sptr block);
   void connect_output(int my_port, int port, gr_basic_block_sptr block);
   void disconnect_input(int my_port, int port, gr_basic_block_sptr block);
diff --git a/gnuradio-core/src/python/gnuradio/gr/hier_block2.py b/gnuradio-core/src/python/gnuradio/gr/hier_block2.py
index 0c45f1691d..f5f0c00f53 100644
--- a/gnuradio-core/src/python/gnuradio/gr/hier_block2.py
+++ b/gnuradio-core/src/python/gnuradio/gr/hier_block2.py
@@ -20,6 +20,7 @@
 #
 
 from gnuradio_core import hier_block2_swig
+from gruel import pmt
 
 #
 # This hack forces a 'has-a' relationship to look like an 'is-a' one.
@@ -111,3 +112,15 @@ class hier_block2(object):
         self._hb.primitive_disconnect(src_block.to_basic_block(), src_port,
                                       dst_block.to_basic_block(), dst_port)
 
+    def msg_connect(self, src, srcport, dst, dstport):
+        self.primitive_msg_connect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport);
+
+    def msg_disconnect(self, src, srcport, dst, dstport):
+        self.primitive_msg_disconnect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport);
+
+    def message_port_register_hier_in(self, portname):
+        self.primitive_message_port_register_hier_in(pmt.pmt_intern(portname));
+
+    def message_port_register_hier_out(self, portname):
+        self.primitive_message_port_register_hier_out(pmt.pmt_intern(portname));
+
diff --git a/grc/blocks/block_tree.xml b/grc/blocks/block_tree.xml
index 95bd7bb3ce..183883959d 100644
--- a/grc/blocks/block_tree.xml
+++ b/grc/blocks/block_tree.xml
@@ -43,6 +43,8 @@
         <block>gr_message_strobe</block>
         <block>gr_pdu_to_tagged_stream</block>
         <block>gr_tagged_stream_to_pdu</block>
+        <block>gr_tuntap_pdu</block>
+        <block>gr_socket_pdu</block>
     </cat>
 	<cat>
 		<name>Operators</name>
diff --git a/grc/blocks/gr_message_debug.xml b/grc/blocks/gr_message_debug.xml
index 705a7cc5f3..4d73fbd9cc 100644
--- a/grc/blocks/gr_message_debug.xml
+++ b/grc/blocks/gr_message_debug.xml
@@ -19,4 +19,9 @@
 		<type>message</type>
 		<optional>1</optional>
 	</sink>
+	<sink>
+		<name>print_pdu_verbose</name>
+		<type>message</type>
+        <optional>1</optional>
+	</sink>
 </block>
diff --git a/grc/blocks/gr_socket_pdu.xml b/grc/blocks/gr_socket_pdu.xml
new file mode 100644
index 0000000000..a175c36991
--- /dev/null
+++ b/grc/blocks/gr_socket_pdu.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0"?>
+<!--
+###################################################
+## Socket PDU Message source/sink
+###################################################
+ -->
+<block>
+	<name>Socket PDU</name>
+	<key>gr_socket_pdu</key>
+	<import>from gnuradio import gr</import>
+	<make>gr.socket_pdu($type, $host, $port, $mtu)</make>
+    <param>
+        <name>Type</name>
+        <key>type</key>
+        <value>TCP_SERVER</value>
+        <type>enum</type>
+        <option>
+            <name>TCP Server</name>
+            <key>"TCP_SERVER"</key>
+        </option>
+        <option>
+            <name>TCP Client</name>
+            <key>"TCP_CLIENT"</key>
+        </option>
+        <option>
+            <name>UDP Server</name>
+            <key>"UDP_SERVER"</key>
+        </option>
+        <option>
+            <name>UDP Client</name>
+            <key>"UDP_CLIENT"</key>
+        </option>
+    </param>
+    <param>
+        <name>Host</name>
+        <key>host</key>
+        <value></value>
+        <type>string</type>
+    </param>
+    <param>
+        <name>Port</name>
+        <key>port</key>
+        <value>52001</value>
+        <type>string</type>
+    </param>
+    <param>
+        <name>MTU</name>
+        <key>mtu</key>
+        <value>10000</value>
+        <type>int</type>
+    </param>
+	<sink>
+		<name>pdus</name>
+		<type>message</type>
+        <optional>1</optional>
+	</sink>
+	<source>
+		<name>pdus</name>
+		<type>message</type>
+        <optional>1</optional>
+	</source>
+</block>
diff --git a/grc/blocks/gr_tuntap_pdu.xml b/grc/blocks/gr_tuntap_pdu.xml
new file mode 100644
index 0000000000..f169345afa
--- /dev/null
+++ b/grc/blocks/gr_tuntap_pdu.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0"?>
+<!--
+###################################################
+## Tuntap PDU Message source/sink
+###################################################
+ -->
+<block>
+	<name>TunTap PDU</name>
+	<key>gr_tuntap_pdu</key>
+	<import>from gnuradio import gr</import>
+	<make>gr.tuntap_pdu($ifn, $mtu)</make>
+    <param>
+        <name>Interface Name</name>
+        <key>ifn</key>
+        <value>tun0</value>
+        <type>string</type>
+    </param>
+    <param>
+        <name>MTU</name>
+        <key>mtu</key>
+        <value>10000</value>
+        <type>int</type>
+    </param>
+	<sink>
+		<name>pdus</name>
+		<type>message</type>
+        <optional>1</optional>
+	</sink>
+	<source>
+		<name>pdus</name>
+		<type>message</type>
+        <optional>1</optional>
+	</source>
+</block>
diff --git a/grc/blocks/pad_sink.xml b/grc/blocks/pad_sink.xml
index f89eaa53c5..f0e10a3391 100644
--- a/grc/blocks/pad_sink.xml
+++ b/grc/blocks/pad_sink.xml
@@ -7,7 +7,9 @@
 <block>
 	<name>Pad Sink</name>
 	<key>pad_sink</key>
-	<make></make>
+    <make>#if str($type) == "message"
+None;self.message_port_register_hier_in($label)
+#end if</make>
 	<param>
 		<name>Label</name>
 		<key>label</key>
@@ -43,6 +45,11 @@
 			<key>byte</key>
 			<opt>size:gr.sizeof_char</opt>
 		</option>
+		<option>
+			<name>Message</name>
+			<key>message</key>
+			<opt>size:0</opt>
+		</option>
 		<option>
 			<name>Wildcard</name>
 			<key></key>
diff --git a/grc/blocks/pad_source.xml b/grc/blocks/pad_source.xml
index cbf38eb390..a56a65dcc3 100644
--- a/grc/blocks/pad_source.xml
+++ b/grc/blocks/pad_source.xml
@@ -7,7 +7,9 @@
 <block>
 	<name>Pad Source</name>
 	<key>pad_source</key>
-	<make></make>
+    <make>#if str($type) == "message"
+None;self.message_port_register_hier_out($label)
+#end if</make>
 	<param>
 		<name>Label</name>
 		<key>label</key>
@@ -43,6 +45,11 @@
 			<key>byte</key>
 			<opt>size:gr.sizeof_char</opt>
 		</option>
+		<option>
+			<name>Message</name>
+			<key>message</key>
+			<opt>size:0</opt>
+		</option>
 		<option>
 			<name>Wildcard</name>
 			<key></key>
diff --git a/grc/python/FlowGraph.py b/grc/python/FlowGraph.py
index efe362760c..376c2e337f 100644
--- a/grc/python/FlowGraph.py
+++ b/grc/python/FlowGraph.py
@@ -58,6 +58,8 @@ class FlowGraph(_FlowGraph, _GUIFlowGraph):
 			'in': self.get_pad_sources(),
 			'out': self.get_pad_sinks(),
 		}[direction]
+        # we only want stream ports
+		sorted_pads = filter(lambda b: b.get_param('type').get_evaluated() != 'message', sorted_pads);
 		#load io signature
 		return [{
 			'label': str(pad.get_param('label').get_evaluated()),
@@ -83,6 +85,14 @@ class FlowGraph(_FlowGraph, _GUIFlowGraph):
 		pads = filter(lambda b: b.get_key() == 'pad_sink', self.get_enabled_blocks())
 		return sorted(pads, lambda x, y: cmp(x.get_id(), y.get_id()))
 
+	def get_msg_pad_sources(self):
+		ps = self.get_pad_sources();
+		return filter(lambda b: b.get_param('type').get_evaluated() == 'message', ps);
+
+	def get_msg_pad_sinks(self):
+		ps = self.get_pad_sinks();
+		return filter(lambda b: b.get_param('type').get_evaluated() == 'message', ps);
+
 	def get_imports(self):
 		"""
 		Get a set of all import statments in this flow graph namespace.
diff --git a/grc/python/convert_hier.py b/grc/python/convert_hier.py
index b609af24ae..508ec63b2b 100644
--- a/grc/python/convert_hier.py
+++ b/grc/python/convert_hier.py
@@ -25,6 +25,8 @@ def convert_hier(flow_graph, python_file):
 	#extract info from the flow graph
 	input_sigs = flow_graph.get_io_signaturev('in')
 	output_sigs = flow_graph.get_io_signaturev('out')
+	input_msgp = flow_graph.get_msg_pad_sources();
+	output_msgp = flow_graph.get_msg_pad_sinks();
 	parameters = flow_graph.get_parameters()
 	block_key = flow_graph.get_option('id')
 	block_name = flow_graph.get_option('title') or flow_graph.get_option('id').replace('_', ' ').title()
@@ -55,7 +57,7 @@ def convert_hier(flow_graph, python_file):
 		param_n['type'] = 'raw'
 		params_n.append(param_n)
 	block_n['param'] = params_n
-	#sink data
+	#sink data stream ports
 	block_n['sink'] = list()
 	for input_sig in input_sigs:
 		sink_n = odict()
@@ -64,7 +66,14 @@ def convert_hier(flow_graph, python_file):
 		sink_n['vlen'] = input_sig['vlen']
 		if input_sig['optional']: sink_n['optional'] = '1'
 		block_n['sink'].append(sink_n)
-	#source data
+	#sink data msg ports
+	for input_sig in input_msgp:
+		sink_n = odict()
+		sink_n['name'] = input_sig.get_param("label").get_value();
+		sink_n['type'] = "message"
+		sink_n['optional'] = input_sig.get_param("optional").get_value();
+		block_n['sink'].append(sink_n)
+	#source data stream ports
 	block_n['source'] = list()
 	for output_sig in output_sigs:
 		source_n = odict()
@@ -73,6 +82,13 @@ def convert_hier(flow_graph, python_file):
 		source_n['vlen'] = output_sig['vlen']
 		if output_sig['optional']: source_n['optional'] = '1'
 		block_n['source'].append(source_n)
+	#source data msg ports
+	for output_sig in output_msgp:
+		source_n = odict()
+		source_n['name'] = output_sig.get_param("label").get_value();
+		source_n['type'] = "message"
+		source_n['optional'] = output_sig.get_param("optional").get_value();
+		block_n['source'].append(source_n)
 	#doc data
 	block_n['doc'] = "%s\n%s\n%s"%(block_author, block_desc, python_file)
 	block_n['grc_source'] = "%s"%(flow_graph.grc_file_path)
diff --git a/grc/python/flow_graph.tmpl b/grc/python/flow_graph.tmpl
index af55ad641a..163e7f76aa 100644
--- a/grc/python/flow_graph.tmpl
+++ b/grc/python/flow_graph.tmpl
@@ -189,6 +189,7 @@ gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))])
 		self.connect($make_port_sig($source), $make_port_sig($sink))
 	#end if
 #end for
+
 ########################################################
 ##Create Asynch Message Connections
 ########################################################
@@ -198,7 +199,21 @@ gr.io_signaturev($(len($io_sigs)), $(len($io_sigs)), [$(', '.join($size_strs))])
 		$DIVIDER
 #end if
 #for $msg in $messages2
-		self.msg_connect(self.$msg.get_source().get_parent().get_id(), "$msg.get_source().get_name()", self.$msg.get_sink().get_parent().get_id(), "$msg.get_sink().get_name()")
+		#set $sr = $msg.get_source()
+		#set $source = "self.%s"%($sr.get_parent().get_id())
+		#set $source_port = $sr.get_name();
+		#if $sr.get_parent().get_key() == "pad_source"
+			#set $source = "self"
+			#set $source_port = $sr.get_parent().get_param("label").get_value();
+		#end if
+		#set $sk = $msg.get_sink()
+		#set $sink = "self.%s"%($sk.get_parent().get_id())
+		#set $sink_port = $sk.get_name();
+		#if $sk.get_parent().get_key() == "pad_sink"
+			#set $sink = "self"
+			#set $sink_port = $sk.get_parent().get_param("label").get_value();
+		#end if
+		self.msg_connect($source, "$source_port", $sink, "$sink_port")
 #end for
 
 ########################################################
diff --git a/gruel/src/include/gruel/pmt.h b/gruel/src/include/gruel/pmt.h
index a462155c5f..d09686783c 100644
--- a/gruel/src/include/gruel/pmt.h
+++ b/gruel/src/include/gruel/pmt.h
@@ -734,6 +734,12 @@ GRUEL_API pmt_t pmt_list_add(pmt_t list, const pmt_t& item);
  */
 GRUEL_API pmt_t pmt_list_rm(pmt_t list, const pmt_t& item);
 
+/*!
+ * \brief Return bool of \p list contains \p item
+ */
+GRUEL_API bool pmt_list_has(pmt_t list, const pmt_t& item);
+
+
 /*
  * ------------------------------------------------------------------------
  *			     read / write
diff --git a/gruel/src/lib/pmt/pmt.cc b/gruel/src/lib/pmt/pmt.cc
index 3eb39ed7b1..e5baca98a8 100644
--- a/gruel/src/lib/pmt/pmt.cc
+++ b/gruel/src/lib/pmt/pmt.cc
@@ -1340,6 +1340,22 @@ pmt_list_rm(pmt_t list, const pmt_t& item)
     }
 }
 
+bool
+pmt_list_has(pmt_t list, const pmt_t& item)
+{
+  if(pmt_is_pair(list)){
+    pmt_t left = pmt_car(list);
+    pmt_t right = pmt_cdr(list);
+    if(pmt_equal(left,item))
+        return true;
+    return pmt_list_has(right, item);   
+  } else {
+    if(pmt_is_null(list))
+        return false;
+    throw std::runtime_error("list contains invalid format!");
+  }
+}
+
 pmt_t
 pmt_caar(pmt_t pair)
 {
diff --git a/gruel/src/swig/pmt_swig.i b/gruel/src/swig/pmt_swig.i
index d46143424b..b1628c9983 100644
--- a/gruel/src/swig/pmt_swig.i
+++ b/gruel/src/swig/pmt_swig.i
@@ -701,6 +701,11 @@ pmt_t pmt_list_add(pmt_t list, const pmt_t& item);
  */
 pmt_t pmt_list_rm(pmt_t list, const pmt_t& item);
 
+/*!
+ * \brief Return bool of \p list contains \p item
+ */
+bool pmt_list_has(pmt_t list, const pmt_t& item);
+
 /*
  * ------------------------------------------------------------------------
  *			     read / write
diff --git a/volk/CMakeLists.txt b/volk/CMakeLists.txt
index 68385f9740..9519505eb9 100644
--- a/volk/CMakeLists.txt
+++ b/volk/CMakeLists.txt
@@ -38,12 +38,8 @@ set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) #location for custom "M
 # Environment setup
 ########################################################################
 IF(NOT DEFINED BOOST_ROOT)
-    SET(BOOST_ROOT "")
+    SET(BOOST_ROOT ${CMAKE_INSTALL_PREFIX})
 ENDIF()
-SET(BOOST_ROOT ${BOOST_ROOT} CACHE STRING "Modify search path for Boost components")
-
-#after caching user-defined value, make sure to add the install prefix
-SET(BOOST_ROOT ${BOOST_ROOT}:${CMAKE_INSTALL_PREFIX})
 
 IF(NOT DEFINED CROSSCOMPILE_MULTILIB)
     SET(CROSSCOMPILE_MULTILIB "")
@@ -77,6 +73,10 @@ set(Boost_ADDITIONAL_VERSIONS
 )
 find_package(Boost COMPONENTS unit_test_framework)
 
+if(NOT Boost_FOUND)
+    message(FATAL_ERROR "VOLK Requires boost to build")
+endif()
+
 find_package(ORC)
 
 ########################################################################
diff --git a/volk/apps/CMakeLists.txt b/volk/apps/CMakeLists.txt
index 175105a5a3..a89a9409d8 100644
--- a/volk/apps/CMakeLists.txt
+++ b/volk/apps/CMakeLists.txt
@@ -18,7 +18,7 @@
 ########################################################################
 # Setup profiler
 ########################################################################
-find_package(Boost)
+find_package(Boost COMPONENTS unit_test_framework)
 
 if(Boost_FOUND AND UNIX) #uses mkdir and $HOME
 
-- 
cgit v1.2.3


From ee53cca50e3f39ddc1d44669c3ea9f0d73d32022 Mon Sep 17 00:00:00 2001
From: "Brett L. Trotter" <blt@webtrotter.com>
Date: Tue, 11 Dec 2012 21:53:08 -0500
Subject: core: Patch to file source to allow opening of new files while
 running.

Addresses issues #352.
---
 gnuradio-core/src/lib/io/gr_file_source.cc | 96 ++++++++++++++++++++++++------
 gnuradio-core/src/lib/io/gr_file_source.h  | 64 ++++++++++++++++----
 gnuradio-core/src/lib/io/gr_file_source.i  |  2 +
 grc/blocks/gr_file_source.xml              |  1 +
 4 files changed, 131 insertions(+), 32 deletions(-)

(limited to 'grc')

diff --git a/gnuradio-core/src/lib/io/gr_file_source.cc b/gnuradio-core/src/lib/io/gr_file_source.cc
index 96333fa24c..09f3986cd2 100644
--- a/gnuradio-core/src/lib/io/gr_file_source.cc
+++ b/gnuradio-core/src/lib/io/gr_file_source.cc
@@ -49,24 +49,14 @@
 #define	OUR_O_LARGEFILE 0
 #endif
 
-gr_file_source::gr_file_source (size_t itemsize, const char *filename, bool repeat)
-  : gr_sync_block ("file_source",
-		   gr_make_io_signature (0, 0, 0),
-		   gr_make_io_signature (1, 1, itemsize)),
-    d_itemsize (itemsize), d_fp (0), d_repeat (repeat)
+gr_file_source::gr_file_source(size_t itemsize, const char *filename, bool repeat)
+  : gr_sync_block("file_source",
+		  gr_make_io_signature (0, 0, 0),
+		  gr_make_io_signature (1, 1, itemsize)),
+    d_itemsize(itemsize), d_fp(0), d_new_fp (0), d_repeat(repeat),
+    d_updated(false)
 {
-  // we use "open" to use to the O_LARGEFILE flag
-
-  int fd;
-  if ((fd = open (filename, O_RDONLY | OUR_O_LARGEFILE | OUR_O_BINARY)) < 0){
-    perror (filename);
-    throw std::runtime_error ("can't open file");
-  }
-
-  if ((d_fp = fdopen (fd, "rb")) == NULL){
-    perror (filename);
-    throw std::runtime_error ("can't open file");
-  }
+  open(filename, repeat);
 }
 
 // public constructor that returns a shared_ptr
@@ -79,7 +69,11 @@ gr_make_file_source (size_t itemsize, const char *filename, bool repeat)
 
 gr_file_source::~gr_file_source ()
 {
-  fclose ((FILE *) d_fp);
+  close();
+  if(d_fp) {
+    fclose(d_fp);
+    d_fp = 0;
+  }
 }
 
 int
@@ -91,6 +85,11 @@ gr_file_source::work (int noutput_items,
   int i;
   int size = noutput_items;
 
+  do_update();       // update d_fp is reqd
+  if(d_fp == NULL)
+    throw std::runtime_error("work with file not open");
+
+  boost::mutex::scoped_lock lock(fp_mutex); // hold for the rest of this function
   while (size) {
     i = fread(o, d_itemsize, size, (FILE *) d_fp);
 
@@ -129,5 +128,64 @@ gr_file_source::work (int noutput_items,
 bool
 gr_file_source::seek (long seek_point, int whence)
 {
-   return fseek ((FILE *) d_fp, seek_point * d_itemsize, whence) == 0;
+  // obtain exclusive access for duration of this function
+  boost::mutex::scoped_lock lock(fp_mutex);
+  return fseek((FILE *) d_fp, seek_point * d_itemsize, whence) == 0;
+}
+
+void
+gr_file_source::open(const char *filename, bool repeat)
+{
+  // obtain exclusive access for duration of this function
+  boost::mutex::scoped_lock     lock(fp_mutex);
+
+  int fd;
+
+  // we use "open" to use to the O_LARGEFILE flag
+  if((fd = ::open(filename, O_RDONLY | OUR_O_LARGEFILE | OUR_O_BINARY)) < 0) {
+    perror(filename);
+    throw std::runtime_error("can't open file");
+  }
+
+  if(d_new_fp) {
+    fclose(d_new_fp);
+    d_new_fp = 0;
+  }
+
+  if((d_new_fp = fdopen (fd, "rb")) == NULL) {
+    perror(filename);
+    ::close(fd);	// don't leak file descriptor if fdopen fails
+    throw std::runtime_error("can't open file");
+  }
+
+  d_updated = true;
+  d_repeat = repeat;
+}
+
+void
+gr_file_source::close()
+{
+  // obtain exclusive access for duration of this function
+  boost::mutex::scoped_lock lock(fp_mutex);
+
+  if(d_new_fp != NULL) {
+    fclose(d_new_fp);
+    d_new_fp = NULL;
+  }
+  d_updated = true;
+}
+
+void
+gr_file_source::do_update()
+{
+  if(d_updated) {
+    boost::mutex::scoped_lock lock(fp_mutex); // hold while in scope
+
+    if(d_fp)
+      fclose(d_fp);
+
+    d_fp = d_new_fp;                    // install new file pointer
+    d_new_fp = 0;
+    d_updated = false;
+  }
 }
diff --git a/gnuradio-core/src/lib/io/gr_file_source.h b/gnuradio-core/src/lib/io/gr_file_source.h
index 1cc44a3b1f..0478fba04b 100644
--- a/gnuradio-core/src/lib/io/gr_file_source.h
+++ b/gnuradio-core/src/lib/io/gr_file_source.h
@@ -25,6 +25,7 @@
 
 #include <gr_core_api.h>
 #include <gr_sync_block.h>
+#include <boost/thread/mutex.hpp>
 
 class gr_file_source;
 typedef boost::shared_ptr<gr_file_source> gr_file_source_sptr;
@@ -39,31 +40,68 @@ gr_make_file_source (size_t itemsize, const char *filename, bool repeat = false)
 
 class GR_CORE_API gr_file_source : public gr_sync_block
 {
-  friend GR_CORE_API gr_file_source_sptr gr_make_file_source (size_t itemsize,
-						  const char *filename,
-						  bool repeat);
  private:
-  size_t	d_itemsize;
-  void	       *d_fp;
-  bool		d_repeat;
+  size_t d_itemsize;
+  FILE *d_fp;
+  FILE *d_new_fp;
+  bool d_repeat;
+  bool d_updated;
 
  protected:
-  gr_file_source (size_t itemsize, const char *filename, bool repeat);
+  gr_file_source(size_t itemsize, const char *filename, bool repeat);
+
+  void do_update();
+
+  boost::mutex fp_mutex;
 
  public:
-  ~gr_file_source ();
+  /*!
+   * \brief Create a file source.
+   *
+   * Opens \p filename as a source of items into a flowgraph. The data
+   * is expected to be in binary format, item after item. The \p
+   * itemsize of the block determines the conversion from bits to
+   * items.
+   *
+   * If \p repeat is turned on, the file will repeat the file after
+   * it's reached the end.
+   *
+   * \param itemsize	the size of each item in the file, in bytes
+   * \param filename	name of the file to source from
+   * \param repeat	repeat file from start
+   */
+  friend GR_CORE_API gr_file_source_sptr
+    gr_make_file_source(size_t itemsize,
+			const char *filename,
+			bool repeat);
+
+  ~gr_file_source();
 
-  int work (int noutput_items,
-	    gr_vector_const_void_star &input_items,
-	    gr_vector_void_star &output_items);
+  int work(int noutput_items,
+	   gr_vector_const_void_star &input_items,
+	   gr_vector_void_star &output_items);
 
   /*!
-   * \brief seek file to \p seek_point relative to \p whence
+   * \brief Seek file to \p seek_point relative to \p whence
    *
    * \param seek_point	sample offset in file
    * \param whence	one of SEEK_SET, SEEK_CUR, SEEK_END (man fseek)
    */
-  bool seek (long seek_point, int whence);
+  bool seek(long seek_point, int whence);
+
+  /*!
+   * \brief Opens a new file.
+   *
+   * \param filename	name of the file to source from
+   * \param repeat	repeat file from start
+   */
+  void open(const char *filename, bool repeat);
+
+  /*!
+   * \brief Close the file handle.
+   */
+  void close();
+
 };
 
 #endif /* INCLUDED_GR_FILE_SOURCE_H */
diff --git a/gnuradio-core/src/lib/io/gr_file_source.i b/gnuradio-core/src/lib/io/gr_file_source.i
index 9bf44691d0..e71cef0d14 100644
--- a/gnuradio-core/src/lib/io/gr_file_source.i
+++ b/gnuradio-core/src/lib/io/gr_file_source.i
@@ -40,4 +40,6 @@ class gr_file_source : public gr_sync_block
   ~gr_file_source ();
 
   bool seek (long seek_point, int whence);
+  void open (const char *filename, bool repeat);
+  void close();
 };
diff --git a/grc/blocks/gr_file_source.xml b/grc/blocks/gr_file_source.xml
index fcc7a70401..5f0e16b279 100644
--- a/grc/blocks/gr_file_source.xml
+++ b/grc/blocks/gr_file_source.xml
@@ -9,6 +9,7 @@
 	<key>gr_file_source</key>
 	<import>from gnuradio import gr</import>
 	<make>gr.file_source($type.size*$vlen, $file, $repeat)</make>
+	<callback>open($file, $repeat)</callback>
 	<param>
 		<name>File</name>
 		<key>file</key>
-- 
cgit v1.2.3