From 97ff82dd63f06ad51992222c160a9673260b13f7 Mon Sep 17 00:00:00 2001
From: Ben Reynwar <ben@reynwar.net>
Date: Tue, 22 Jan 2013 12:46:01 -0700
Subject: core: Add min_noutput_items to gr_block.

---
 gnuradio-core/src/lib/runtime/gr_block.cc          |  5 ++--
 gnuradio-core/src/lib/runtime/gr_block.h           | 20 ++++++++++++++-
 gnuradio-core/src/lib/runtime/gr_block_executor.cc | 29 +++++++++++-----------
 3 files changed, 36 insertions(+), 18 deletions(-)

(limited to 'gnuradio-core/src')

diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 5ba30955f9..b39a680c59 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -31,8 +31,8 @@
 #include <gr_block_registry.h>
 
 gr_block::gr_block (const std::string &name,
-		    gr_io_signature_sptr input_signature,
-		    gr_io_signature_sptr output_signature)
+                    gr_io_signature_sptr input_signature,
+                    gr_io_signature_sptr output_signature)
   : gr_basic_block(name, input_signature, output_signature),
     d_output_multiple (1),
     d_output_multiple_set(false),
@@ -43,6 +43,7 @@ gr_block::gr_block (const std::string &name,
     d_fixed_rate(false),
     d_max_noutput_items_set(false),
     d_max_noutput_items(0),
+    d_min_noutput_items(0),
     d_tag_propagation_policy(TPP_ALL_TO_ALL),
     d_max_output_buffer(std::max(output_signature->max_streams(),1), -1),
     d_min_output_buffer(std::max(output_signature->max_streams(),1), -1)
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 7a70bdaf09..52cf723a99 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -251,6 +251,23 @@ class GR_CORE_API gr_block : public gr_basic_block {
    */
   void set_tag_propagation_policy(tag_propagation_policy_t p);
 
+  /*!
+   * \brief Return the minimum number of output items this block can
+   * produce during a call to work.
+   *
+   * Should be 0 for most blocks.  Useful if we're dealing with packets and
+   * the block produces one packet per call to work.
+  */
+  int min_noutput_items() const { return d_min_noutput_items; }
+
+  /*!
+   * \brief Set the minimum number of output items this block can
+   * produce during a call to work.
+   *
+   * \param m the minimum noutput_items this block can produce.
+   */
+  void set_min_noutput_items(int m) { d_min_noutput_items = m; }
+
   /*!
    * \brief Return the maximum number of output items this block will
    * handle during a call to work.
@@ -258,7 +275,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
   int max_noutput_items();
 
   /*!
-   * \brief Set the maximum number of ouput items htis block will
+   * \brief Set the maximum number of output items this block will
    * handle during a call to work.
    *
    * \param m the maximum noutput_items this block will handle.
@@ -370,6 +387,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
   gr_block_detail_sptr	d_detail;		// implementation details
   unsigned              d_history;
   bool                  d_fixed_rate;
+  int                   d_min_noutput_items;
   bool                  d_max_noutput_items_set;     // if d_max_noutput_items is valid
   int                   d_max_noutput_items;         // value of max_noutput_items for this block
   tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
index 375b58f563..8aadb201a7 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_executor.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -64,22 +64,21 @@ round_down (unsigned int n, unsigned int multiple)
 // on is done.
 //
 static int
-min_available_space (gr_block_detail *d, int output_multiple)
+min_available_space (gr_block_detail *d, int output_multiple, int min_noutput_items)
 {
-  int	min_space = std::numeric_limits<int>::max();
-
+  int min_space = std::numeric_limits<int>::max();
+  if (min_noutput_items == 0)
+    min_noutput_items = 1;
   for (int i = 0; i < d->noutputs (); i++){
     gruel::scoped_lock guard(*d->output(i)->mutex());
-#if 0
-    int n = round_down(d->output(i)->space_available(), output_multiple);
-#else
-    int n = round_down(std::min(d->output(i)->space_available(),
-				d->output(i)->bufsize()/2),
-		       output_multiple);
-#endif
-    if (n == 0){			// We're blocked on output.
-      if (d->output(i)->done()){	// Downstream is done, therefore we're done.
-	return -1;
+    int avail_n = round_down(d->output(i)->space_available(), output_multiple);
+    int best_n = round_down(d->output(i)->bufsize()/2, output_multiple);
+    if (best_n < min_noutput_items)
+      throw std::runtime_error("Buffer too small for min_noutput_items");
+    int n = std::min(avail_n, best_n);
+    if (n < min_noutput_items){  // We're blocked on output.
+      if (d->output(i)->done()){ // Downstream is done, therefore we're done.
+        return -1;
       }
       return 0;
     }
@@ -205,7 +204,7 @@ gr_block_executor::run_one_iteration()
     d_start_nitems_read.resize(0);
 
     // determine the minimum available output space
-    noutput_items = min_available_space (d, m->output_multiple ());
+    noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ());
     noutput_items = std::min(noutput_items, max_noutput_items);
     LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
     if (noutput_items == -1)		// we're done
@@ -286,7 +285,7 @@ gr_block_executor::run_one_iteration()
     }
 
     // determine the minimum available output space
-    noutput_items = min_available_space (d, m->output_multiple ());
+    noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ());
     if (ENABLE_LOGGING){
       *d_log << " regular ";
       if (m->relative_rate() >= 1.0)
-- 
cgit v1.2.3


From 5ab960295f00991fa9447819b3ff9eaf8d88d28e Mon Sep 17 00:00:00 2001
From: Roy Thompson <rthompso@gmail.com>
Date: Thu, 31 Jan 2013 13:16:12 -0700
Subject: core: Enabling msg_connect within python blocks.

---
 gnuradio-core/src/lib/general/gr_block_gateway.h   |  52 +++++++++
 gnuradio-core/src/lib/general/gr_feval.cc          |  16 +++
 gnuradio-core/src/lib/general/gr_feval.h           |  29 +++++
 gnuradio-core/src/lib/general/gr_feval.i           |  29 +++++
 gnuradio-core/src/lib/runtime/gr_basic_block.h     |  25 +++--
 gnuradio-core/src/python/gnuradio/gr/gateway.py    |  28 +++++
 .../gnuradio/gr/qa_python_message_passing.py       | 123 +++++++++++++++++++++
 7 files changed, 290 insertions(+), 12 deletions(-)
 create mode 100644 gnuradio-core/src/python/gnuradio/gr/qa_python_message_passing.py

(limited to 'gnuradio-core/src')

diff --git a/gnuradio-core/src/lib/general/gr_block_gateway.h b/gnuradio-core/src/lib/general/gr_block_gateway.h
index ae91d41b59..c876ea8e14 100644
--- a/gnuradio-core/src/lib/general/gr_block_gateway.h
+++ b/gnuradio-core/src/lib/general/gr_block_gateway.h
@@ -188,6 +188,58 @@ public:
         gr_block::get_tags_in_range(tags, which_input, abs_start, abs_end, key);
         return tags;
     }
+
+    /* Message passing interface */
+    void gr_block__message_port_register_in(pmt::pmt_t port_id){
+        gr_basic_block::message_port_register_in(port_id);
+    }
+
+    void gr_block__message_port_register_out(pmt::pmt_t port_id){
+        gr_basic_block::message_port_register_out(port_id);
+    }
+
+    void gr_block__message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){
+        gr_basic_block::message_port_pub(port_id, msg);
+    }
+
+    void gr_block__message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){
+        gr_basic_block::message_port_sub(port_id, target);
+    }
+
+    void gr_block__message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){
+        gr_basic_block::message_port_unsub(port_id, target);
+    }
+    
+    pmt::pmt_t gr_block__message_ports_in(){
+        return gr_basic_block::message_ports_in();
+    }
+
+    pmt::pmt_t gr_block__message_ports_out(){
+        return gr_basic_block::message_ports_out();
+    }
+
+    void set_msg_handler_feval(pmt::pmt_t which_port, gr_feval_p *msg_handler)
+    {
+        if(msg_queue.find(which_port) == msg_queue.end()){ 
+	    throw std::runtime_error("attempt to set_msg_handler_feval() on bad input message port!"); 
+	}
+	d_msg_handlers_feval[which_port] = msg_handler;
+    }
+
+protected:
+    typedef std::map<pmt::pmt_t, gr_feval_p *, pmt::pmt_comperator> msg_handlers_feval_t;
+    msg_handlers_feval_t d_msg_handlers_feval;
+
+    void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg){
+        // Is there a handler?
+        if (d_msg_handlers_feval.find(which_port) != d_msg_handlers_feval.end()){
+    	    d_msg_handlers_feval[which_port]->calleval(msg); // Yes, invoke it.
+        }
+        else {
+	    // Pass to generic dispatcher if not found
+	    gr_basic_block::dispatch_msg(which_port, msg);
+        }
+    }
 };
 
 /*!
diff --git a/gnuradio-core/src/lib/general/gr_feval.cc b/gnuradio-core/src/lib/general/gr_feval.cc
index ca5714a796..89f09984cf 100644
--- a/gnuradio-core/src/lib/general/gr_feval.cc
+++ b/gnuradio-core/src/lib/general/gr_feval.cc
@@ -88,6 +88,22 @@ gr_feval::calleval(void)
   eval();
 }
 
+// ----------------------------------------------------------------
+
+gr_feval_p::~gr_feval_p(){}
+
+void
+gr_feval_p::eval(pmt::pmt_t x)
+{
+  // nop
+}
+
+void
+gr_feval_p::calleval(pmt::pmt_t x)
+{
+  eval(x);
+}
+
 /*
  * Trivial examples showing C++ (transparently) calling Python
  */
diff --git a/gnuradio-core/src/lib/general/gr_feval.h b/gnuradio-core/src/lib/general/gr_feval.h
index 1726a8a7f9..a9bccfe51c 100644
--- a/gnuradio-core/src/lib/general/gr_feval.h
+++ b/gnuradio-core/src/lib/general/gr_feval.h
@@ -24,6 +24,7 @@
 
 #include <gr_core_api.h>
 #include <gr_complex.h>
+#include <gruel/pmt.h>
 
 /*!
  * \brief base class for evaluating a function: double -> double
@@ -137,6 +138,34 @@ public:
   virtual void calleval();	// invoke "eval"
 };
 
+/*!
+ * \brief base class for evaluating a function: pmt -> void
+ * \ingroup misc
+ *
+ * This class is designed to be subclassed in Python or C++
+ * and is callable from both places.  It uses SWIG's
+ * "director" feature to implement the magic.
+ * It's slow. Don't use it in a performance critical path.
+ *
+ * Override eval to define the behavior.
+ * Use calleval to invoke eval (this kludge is required to allow a
+ * python specific "shim" to be inserted.
+ */
+class GR_CORE_API gr_feval_p
+{
+protected:
+  /*!
+   * \brief override this to define the function
+   */
+  virtual void eval(pmt::pmt_t x);
+
+public:
+  gr_feval_p() {}
+  virtual ~gr_feval_p();
+
+  virtual void calleval(pmt::pmt_t x);	// invoke "eval"
+};
+
 /*!
  * \brief trivial examples / test cases showing C++ calling Python code
  */
diff --git a/gnuradio-core/src/lib/general/gr_feval.i b/gnuradio-core/src/lib/general/gr_feval.i
index bc219a6431..bcf4f1e646 100644
--- a/gnuradio-core/src/lib/general/gr_feval.i
+++ b/gnuradio-core/src/lib/general/gr_feval.i
@@ -45,23 +45,28 @@
 
 // Directors are only supported in Python, Java and C#
 #ifdef SWIGPYTHON
+%include "pmt_swig.i"
+using namespace pmt;
 
 // Enable SWIG directors for these classes
 %feature("director") gr_py_feval_dd;
 %feature("director") gr_py_feval_cc;
 %feature("director") gr_py_feval_ll;
 %feature("director") gr_py_feval;
+%feature("director") gr_py_feval_p;
 
 %feature("nodirector") gr_py_feval_dd::calleval;
 %feature("nodirector") gr_py_feval_cc::calleval;
 %feature("nodirector") gr_py_feval_ll::calleval;
 %feature("nodirector") gr_py_feval::calleval;
+%feature("nodirector") gr_py_feval_p::calleval;
 
 
 %rename(feval_dd) gr_py_feval_dd;
 %rename(feval_cc) gr_py_feval_cc;
 %rename(feval_ll) gr_py_feval_ll;
 %rename(feval)    gr_py_feval;
+%rename(feval_p)  gr_py_feval_p;
 
 //%exception {
 //  try { $action }
@@ -136,12 +141,26 @@ public:
   virtual void calleval();
 };
 
+%ignore gr_feval_p;
+class gr_feval_p
+{
+protected:
+  virtual void eval(pmt_t x);
+
+public:
+  gr_feval_p() {}
+  virtual ~gr_feval_p();
+
+  virtual void calleval(pmt_t x);
+};
+
 /*
  * These are the ones to derive from in Python.  They have the magic shim
  * that ensures that we're holding the Python GIL when we enter Python land...
  */
 
 %inline %{
+#include <gruel/pmt.h>
 
 class gr_py_feval_dd : public gr_feval_dd
 {
@@ -183,6 +202,16 @@ class gr_py_feval : public gr_feval
   }
 };
 
+class gr_py_feval_p : public gr_feval_p
+{
+ public:
+  void calleval(pmt::pmt_t x)
+  {
+    ensure_py_gil_state _lock;
+    eval(x);
+  }
+};
+
 %}
 
 
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index 9cc2ad7755..b4935d8ac8 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -54,18 +54,6 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
   typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
   
  private:
-  /*
-   * This function is called by the runtime system to dispatch messages.
-   *
-   * The thread-safety guarantees mentioned in set_msg_handler are implemented
-   * by the callers of this method.
-   */
-  void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
-  {
-    // AA Update this
-    if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
-      d_msg_handlers[which_port](msg); // Yes, invoke it.
-  };
   
   //msg_handler_t	 d_msg_handler;
   typedef std::map<pmt::pmt_t , msg_handler_t, pmt::pmt_comperator> d_msg_handlers_t;
@@ -117,6 +105,19 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
    */
   void set_color(vcolor color) { d_color = color; }
   vcolor color() const { return d_color; }
+
+  /*
+   * This function is called by the runtime system to dispatch messages.
+   *
+   * The thread-safety guarantees mentioned in set_msg_handler are implemented
+   * by the callers of this method.
+   */
+  virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
+  {
+    // AA Update this
+    if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+      d_msg_handlers[which_port](msg); // Yes, invoke it.
+  };
   
   // Message passing interface
   pmt::pmt_t message_subscribers;
diff --git a/gnuradio-core/src/python/gnuradio/gr/gateway.py b/gnuradio-core/src/python/gnuradio/gr/gateway.py
index 244b8b5925..c25755bb57 100644
--- a/gnuradio-core/src/python/gnuradio/gr/gateway.py
+++ b/gnuradio-core/src/python/gnuradio/gr/gateway.py
@@ -59,6 +59,24 @@ class gateway_handler(gr_core.feval_ll):
             raise ex
         return 0
 
+########################################################################
+# Handler that does callbacks from C++
+########################################################################
+class msg_handler(gr_core.feval_p):
+
+    #dont put a constructor, it wont work
+
+    def init(self, callback):
+        self._callback = callback
+
+    def eval(self, arg):
+        try: self._callback(arg)
+        except Exception as ex:
+            print("handler caught exception: %s"%ex)
+            import traceback; traceback.print_exc()
+            raise ex
+        return 0
+
 ########################################################################
 # The guts that make this into a gr block
 ########################################################################
@@ -91,6 +109,9 @@ class gateway_block(object):
             self.__handler, name, gr_in_sig, gr_out_sig, work_type, factor)
         self.__message = self.__gateway.gr_block_message()
 
+        #dict to keep references to all message handlers
+        self.__msg_handlers = {}
+
         #register gr_block functions
         prefix = 'gr_block__'
         for attr in [x for x in dir(self.__gateway) if x.startswith(prefix)]:
@@ -171,6 +192,13 @@ class gateway_block(object):
     def start(self): return True
     def stop(self): return True
 
+    def set_msg_handler(self, which_port, handler_func):
+        handler = msg_handler()
+        handler.init(handler_func)
+        self.__gateway.set_msg_handler_feval(which_port, handler)
+        # Save handler object in class so it's not garbage collected
+        self.__msg_handlers[which_port] = handler
+
 ########################################################################
 # Wrappers for the user to inherit from
 ########################################################################
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_python_message_passing.py b/gnuradio-core/src/python/gnuradio/gr/qa_python_message_passing.py
new file mode 100644
index 0000000000..06bb96947b
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_python_message_passing.py
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING.  If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+try: import pmt
+except: from gruel import pmt
+import numpy
+import time
+
+# Simple block to generate messages
+class message_generator(gr.sync_block):
+    def __init__(self, msg_list, msg_interval):
+        gr.sync_block.__init__(
+            self,
+            name = "message generator",
+            in_sig = [numpy.float32],
+            out_sig = None
+        )
+        self.msg_list = msg_list
+        self.msg_interval = msg_interval
+        self.msg_ctr = 0
+        self.message_port_register_out(pmt.pmt_intern('out_port'))
+
+
+    def work(self, input_items, output_items):
+        inLen = len(input_items[0])
+        while self.msg_ctr < len(self.msg_list) and \
+                (self.msg_ctr * self.msg_interval) < \
+                (self.nitems_read(0) + inLen):
+            self.message_port_pub(pmt.pmt_intern('out_port'),
+                                  self.msg_list[self.msg_ctr])
+            self.msg_ctr += 1
+        return inLen
+
+# Simple block to consume messages
+class message_consumer(gr.sync_block):
+    def __init__(self):
+        gr.sync_block.__init__(
+            self,
+            name = "message consumer",
+            in_sig = None,
+            out_sig = None
+        )
+        self.msg_list = []
+        self.message_port_register_in(pmt.pmt_intern('in_port'))
+        self.set_msg_handler(pmt.pmt_intern('in_port'),
+                             self.handle_msg)
+
+    def handle_msg(self, msg):
+        # Create a new PMT from long value and put in list
+        self.msg_list.append(pmt.pmt_from_long(pmt.pmt_to_long(msg)))
+
+class test_python_message_passing(gr_unittest.TestCase):
+    
+    def setUp(self):
+        self.tb = gr.top_block()
+
+    def tearDown(self):
+        self.tb = None
+
+    def test_000(self):
+        num_msgs = 10
+        msg_interval = 1000
+        msg_list = []
+        for i in range(num_msgs):
+            msg_list.append(pmt.pmt_from_long(i))
+
+        # Create vector source with dummy data to trigger messages
+        src_data = []
+        for i in range(num_msgs*msg_interval):
+            src_data.append(float(i))
+        src = gr.vector_source_f(src_data, False)
+        msg_gen = message_generator(msg_list, msg_interval)
+        msg_cons = message_consumer()
+        
+        # Connect vector source to message gen
+        self.tb.connect(src, msg_gen)
+        
+        # Connect message generator to message consumer
+        self.tb.msg_connect(msg_gen, 'out_port', msg_cons, 'in_port')
+
+        # Verify that the messgae port query functions work
+        self.assertEqual(pmt.pmt_symbol_to_string(pmt.pmt_vector_ref(
+                    msg_gen.message_ports_out(), 0)), 'out_port')
+        self.assertEqual(pmt.pmt_symbol_to_string(pmt.pmt_vector_ref(
+                    msg_cons.message_ports_in(), 0)), 'in_port')
+        
+        # Run to verify message passing
+        self.tb.start()
+
+        # Wait for all messages to be sent
+        while msg_gen.msg_ctr < num_msgs:
+            time.sleep(0.5)
+        self.tb.stop()
+        self.tb.wait()               
+        
+        # Verify that the message consumer got all the messages
+        self.assertEqual(num_msgs, len(msg_cons.msg_list))
+        for i in range(num_msgs):
+            self.assertTrue(pmt.pmt_equal(msg_list[i], msg_cons.msg_list[i]))
+        
+if __name__ == '__main__':
+    gr_unittest.run(test_python_message_passing, 
+                    'test_python_message_passing.xml')
-- 
cgit v1.2.3