diff options
author | Tom Rondeau <trondeau@vt.edu> | 2013-02-13 13:34:58 -0500 |
---|---|---|
committer | Tom Rondeau <trondeau@vt.edu> | 2013-02-13 13:34:58 -0500 |
commit | da8abe401ae3859b07c48fb2a5dab4717b6c15cc (patch) | |
tree | 357eef8306745eabbb7a41b8d81ffe8db7ff7e6e /gnuradio-core/src | |
parent | a482bd9a29ba643ff148ae392dc359ff53dd7fb9 (diff) | |
parent | af7d55fda43746ae187bc520952eacf420f8363f (diff) |
Merge branch 'next' of gnuradio.org:gnuradio into next
Diffstat (limited to 'gnuradio-core/src')
-rw-r--r-- | gnuradio-core/src/lib/general/gr_block_gateway.h | 52 | ||||
-rw-r--r-- | gnuradio-core/src/lib/general/gr_feval.cc | 16 | ||||
-rw-r--r-- | gnuradio-core/src/lib/general/gr_feval.h | 29 | ||||
-rw-r--r-- | gnuradio-core/src/lib/general/gr_feval.i | 29 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.h | 25 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.cc | 5 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.h | 20 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block_executor.cc | 29 | ||||
-rw-r--r-- | gnuradio-core/src/python/gnuradio/gr/gateway.py | 28 | ||||
-rw-r--r-- | gnuradio-core/src/python/gnuradio/gr/qa_python_message_passing.py | 123 |
10 files changed, 326 insertions, 30 deletions
diff --git a/gnuradio-core/src/lib/general/gr_block_gateway.h b/gnuradio-core/src/lib/general/gr_block_gateway.h index ae91d41b59..ce87a76c25 100644 --- a/gnuradio-core/src/lib/general/gr_block_gateway.h +++ b/gnuradio-core/src/lib/general/gr_block_gateway.h @@ -188,6 +188,58 @@ public: gr_block::get_tags_in_range(tags, which_input, abs_start, abs_end, key); return tags; } + + /* Message passing interface */ + void gr_block__message_port_register_in(pmt::pmt_t port_id){ + gr_basic_block::message_port_register_in(port_id); + } + + void gr_block__message_port_register_out(pmt::pmt_t port_id){ + gr_basic_block::message_port_register_out(port_id); + } + + void gr_block__message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){ + gr_basic_block::message_port_pub(port_id, msg); + } + + void gr_block__message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){ + gr_basic_block::message_port_sub(port_id, target); + } + + void gr_block__message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){ + gr_basic_block::message_port_unsub(port_id, target); + } + + pmt::pmt_t gr_block__message_ports_in(){ + return gr_basic_block::message_ports_in(); + } + + pmt::pmt_t gr_block__message_ports_out(){ + return gr_basic_block::message_ports_out(); + } + + void set_msg_handler_feval(pmt::pmt_t which_port, gr_feval_p *msg_handler) + { + if(msg_queue.find(which_port) == msg_queue.end()){ + throw std::runtime_error("attempt to set_msg_handler_feval() on bad input message port!"); + } + d_msg_handlers_feval[which_port] = msg_handler; + } + +protected: + typedef std::map<pmt::pmt_t, gr_feval_p *, pmt::comperator> msg_handlers_feval_t; + msg_handlers_feval_t d_msg_handlers_feval; + + void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg){ + // Is there a handler? + if (d_msg_handlers_feval.find(which_port) != d_msg_handlers_feval.end()){ + d_msg_handlers_feval[which_port]->calleval(msg); // Yes, invoke it. + } + else { + // Pass to generic dispatcher if not found + gr_basic_block::dispatch_msg(which_port, msg); + } + } }; /*! diff --git a/gnuradio-core/src/lib/general/gr_feval.cc b/gnuradio-core/src/lib/general/gr_feval.cc index ca5714a796..89f09984cf 100644 --- a/gnuradio-core/src/lib/general/gr_feval.cc +++ b/gnuradio-core/src/lib/general/gr_feval.cc @@ -88,6 +88,22 @@ gr_feval::calleval(void) eval(); } +// ---------------------------------------------------------------- + +gr_feval_p::~gr_feval_p(){} + +void +gr_feval_p::eval(pmt::pmt_t x) +{ + // nop +} + +void +gr_feval_p::calleval(pmt::pmt_t x) +{ + eval(x); +} + /* * Trivial examples showing C++ (transparently) calling Python */ diff --git a/gnuradio-core/src/lib/general/gr_feval.h b/gnuradio-core/src/lib/general/gr_feval.h index 1726a8a7f9..a9bccfe51c 100644 --- a/gnuradio-core/src/lib/general/gr_feval.h +++ b/gnuradio-core/src/lib/general/gr_feval.h @@ -24,6 +24,7 @@ #include <gr_core_api.h> #include <gr_complex.h> +#include <gruel/pmt.h> /*! * \brief base class for evaluating a function: double -> double @@ -138,6 +139,34 @@ public: }; /*! + * \brief base class for evaluating a function: pmt -> void + * \ingroup misc + * + * This class is designed to be subclassed in Python or C++ + * and is callable from both places. It uses SWIG's + * "director" feature to implement the magic. + * It's slow. Don't use it in a performance critical path. + * + * Override eval to define the behavior. + * Use calleval to invoke eval (this kludge is required to allow a + * python specific "shim" to be inserted. + */ +class GR_CORE_API gr_feval_p +{ +protected: + /*! + * \brief override this to define the function + */ + virtual void eval(pmt::pmt_t x); + +public: + gr_feval_p() {} + virtual ~gr_feval_p(); + + virtual void calleval(pmt::pmt_t x); // invoke "eval" +}; + +/*! * \brief trivial examples / test cases showing C++ calling Python code */ GR_CORE_API double gr_feval_dd_example(gr_feval_dd *f, double x); diff --git a/gnuradio-core/src/lib/general/gr_feval.i b/gnuradio-core/src/lib/general/gr_feval.i index bc219a6431..bcf4f1e646 100644 --- a/gnuradio-core/src/lib/general/gr_feval.i +++ b/gnuradio-core/src/lib/general/gr_feval.i @@ -45,23 +45,28 @@ // Directors are only supported in Python, Java and C# #ifdef SWIGPYTHON +%include "pmt_swig.i" +using namespace pmt; // Enable SWIG directors for these classes %feature("director") gr_py_feval_dd; %feature("director") gr_py_feval_cc; %feature("director") gr_py_feval_ll; %feature("director") gr_py_feval; +%feature("director") gr_py_feval_p; %feature("nodirector") gr_py_feval_dd::calleval; %feature("nodirector") gr_py_feval_cc::calleval; %feature("nodirector") gr_py_feval_ll::calleval; %feature("nodirector") gr_py_feval::calleval; +%feature("nodirector") gr_py_feval_p::calleval; %rename(feval_dd) gr_py_feval_dd; %rename(feval_cc) gr_py_feval_cc; %rename(feval_ll) gr_py_feval_ll; %rename(feval) gr_py_feval; +%rename(feval_p) gr_py_feval_p; //%exception { // try { $action } @@ -136,12 +141,26 @@ public: virtual void calleval(); }; +%ignore gr_feval_p; +class gr_feval_p +{ +protected: + virtual void eval(pmt_t x); + +public: + gr_feval_p() {} + virtual ~gr_feval_p(); + + virtual void calleval(pmt_t x); +}; + /* * These are the ones to derive from in Python. They have the magic shim * that ensures that we're holding the Python GIL when we enter Python land... */ %inline %{ +#include <gruel/pmt.h> class gr_py_feval_dd : public gr_feval_dd { @@ -183,6 +202,16 @@ class gr_py_feval : public gr_feval } }; +class gr_py_feval_p : public gr_feval_p +{ + public: + void calleval(pmt::pmt_t x) + { + ensure_py_gil_state _lock; + eval(x); + } +}; + %} diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index d4f5cb5941..de56954c54 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -58,18 +58,6 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_ typedef boost::function<void(pmt::pmt_t)> msg_handler_t; private: - /* - * This function is called by the runtime system to dispatch messages. - * - * The thread-safety guarantees mentioned in set_msg_handler are implemented - * by the callers of this method. - */ - void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) - { - // AA Update this - if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? - d_msg_handlers[which_port](msg); // Yes, invoke it. - }; //msg_handler_t d_msg_handler; typedef std::map<pmt::pmt_t , msg_handler_t, pmt::comperator> d_msg_handlers_t; @@ -124,6 +112,19 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_ */ void set_color(vcolor color) { d_color = color; } vcolor color() const { return d_color; } + + /* + * This function is called by the runtime system to dispatch messages. + * + * The thread-safety guarantees mentioned in set_msg_handler are implemented + * by the callers of this method. + */ + virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) + { + // AA Update this + if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? + d_msg_handlers[which_port](msg); // Yes, invoke it. + }; // Message passing interface pmt::pmt_t message_subscribers; diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc index 33c460e1a0..83bbea37e6 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_block.cc @@ -31,8 +31,8 @@ #include <gr_block_registry.h> gr_block::gr_block (const std::string &name, - gr_io_signature_sptr input_signature, - gr_io_signature_sptr output_signature) + gr_io_signature_sptr input_signature, + gr_io_signature_sptr output_signature) : gr_basic_block(name, input_signature, output_signature), d_output_multiple (1), d_output_multiple_set(false), @@ -43,6 +43,7 @@ gr_block::gr_block (const std::string &name, d_fixed_rate(false), d_max_noutput_items_set(false), d_max_noutput_items(0), + d_min_noutput_items(0), d_tag_propagation_policy(TPP_ALL_TO_ALL), d_pc_rpc_set(false), d_max_output_buffer(std::max(output_signature->max_streams(),1), -1), diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h index 96e07439ef..ee69309169 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.h +++ b/gnuradio-core/src/lib/runtime/gr_block.h @@ -252,13 +252,30 @@ class GR_CORE_API gr_block : public gr_basic_block { void set_tag_propagation_policy(tag_propagation_policy_t p); /*! + * \brief Return the minimum number of output items this block can + * produce during a call to work. + * + * Should be 0 for most blocks. Useful if we're dealing with packets and + * the block produces one packet per call to work. + */ + int min_noutput_items() const { return d_min_noutput_items; } + + /*! + * \brief Set the minimum number of output items this block can + * produce during a call to work. + * + * \param m the minimum noutput_items this block can produce. + */ + void set_min_noutput_items(int m) { d_min_noutput_items = m; } + + /*! * \brief Return the maximum number of output items this block will * handle during a call to work. */ int max_noutput_items(); /*! - * \brief Set the maximum number of ouput items htis block will + * \brief Set the maximum number of output items this block will * handle during a call to work. * * \param m the maximum noutput_items this block will handle. @@ -446,6 +463,7 @@ class GR_CORE_API gr_block : public gr_basic_block { gr_block_detail_sptr d_detail; // implementation details unsigned d_history; bool d_fixed_rate; + int d_min_noutput_items; bool d_max_noutput_items_set; // if d_max_noutput_items is valid int d_max_noutput_items; // value of max_noutput_items for this block tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc index ee0ab9e378..27f591452d 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_executor.cc +++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc @@ -64,22 +64,21 @@ round_down (unsigned int n, unsigned int multiple) // on is done. // static int -min_available_space (gr_block_detail *d, int output_multiple) +min_available_space (gr_block_detail *d, int output_multiple, int min_noutput_items) { - int min_space = std::numeric_limits<int>::max(); - + int min_space = std::numeric_limits<int>::max(); + if (min_noutput_items == 0) + min_noutput_items = 1; for (int i = 0; i < d->noutputs (); i++){ gruel::scoped_lock guard(*d->output(i)->mutex()); -#if 0 - int n = round_down(d->output(i)->space_available(), output_multiple); -#else - int n = round_down(std::min(d->output(i)->space_available(), - d->output(i)->bufsize()/2), - output_multiple); -#endif - if (n == 0){ // We're blocked on output. - if (d->output(i)->done()){ // Downstream is done, therefore we're done. - return -1; + int avail_n = round_down(d->output(i)->space_available(), output_multiple); + int best_n = round_down(d->output(i)->bufsize()/2, output_multiple); + if (best_n < min_noutput_items) + throw std::runtime_error("Buffer too small for min_noutput_items"); + int n = std::min(avail_n, best_n); + if (n < min_noutput_items){ // We're blocked on output. + if (d->output(i)->done()){ // Downstream is done, therefore we're done. + return -1; } return 0; } @@ -205,7 +204,7 @@ gr_block_executor::run_one_iteration() d_start_nitems_read.resize(0); // determine the minimum available output space - noutput_items = min_available_space (d, m->output_multiple ()); + noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ()); noutput_items = std::min(noutput_items, max_noutput_items); LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl); if (noutput_items == -1) // we're done @@ -286,7 +285,7 @@ gr_block_executor::run_one_iteration() } // determine the minimum available output space - noutput_items = min_available_space (d, m->output_multiple ()); + noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ()); if (ENABLE_LOGGING){ *d_log << " regular "; if (m->relative_rate() >= 1.0) diff --git a/gnuradio-core/src/python/gnuradio/gr/gateway.py b/gnuradio-core/src/python/gnuradio/gr/gateway.py index 244b8b5925..c25755bb57 100644 --- a/gnuradio-core/src/python/gnuradio/gr/gateway.py +++ b/gnuradio-core/src/python/gnuradio/gr/gateway.py @@ -60,6 +60,24 @@ class gateway_handler(gr_core.feval_ll): return 0 ######################################################################## +# Handler that does callbacks from C++ +######################################################################## +class msg_handler(gr_core.feval_p): + + #dont put a constructor, it wont work + + def init(self, callback): + self._callback = callback + + def eval(self, arg): + try: self._callback(arg) + except Exception as ex: + print("handler caught exception: %s"%ex) + import traceback; traceback.print_exc() + raise ex + return 0 + +######################################################################## # The guts that make this into a gr block ######################################################################## class gateway_block(object): @@ -91,6 +109,9 @@ class gateway_block(object): self.__handler, name, gr_in_sig, gr_out_sig, work_type, factor) self.__message = self.__gateway.gr_block_message() + #dict to keep references to all message handlers + self.__msg_handlers = {} + #register gr_block functions prefix = 'gr_block__' for attr in [x for x in dir(self.__gateway) if x.startswith(prefix)]: @@ -171,6 +192,13 @@ class gateway_block(object): def start(self): return True def stop(self): return True + def set_msg_handler(self, which_port, handler_func): + handler = msg_handler() + handler.init(handler_func) + self.__gateway.set_msg_handler_feval(which_port, handler) + # Save handler object in class so it's not garbage collected + self.__msg_handlers[which_port] = handler + ######################################################################## # Wrappers for the user to inherit from ######################################################################## diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_python_message_passing.py b/gnuradio-core/src/python/gnuradio/gr/qa_python_message_passing.py new file mode 100644 index 0000000000..06bb96947b --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_python_message_passing.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +try: import pmt +except: from gruel import pmt +import numpy +import time + +# Simple block to generate messages +class message_generator(gr.sync_block): + def __init__(self, msg_list, msg_interval): + gr.sync_block.__init__( + self, + name = "message generator", + in_sig = [numpy.float32], + out_sig = None + ) + self.msg_list = msg_list + self.msg_interval = msg_interval + self.msg_ctr = 0 + self.message_port_register_out(pmt.pmt_intern('out_port')) + + + def work(self, input_items, output_items): + inLen = len(input_items[0]) + while self.msg_ctr < len(self.msg_list) and \ + (self.msg_ctr * self.msg_interval) < \ + (self.nitems_read(0) + inLen): + self.message_port_pub(pmt.pmt_intern('out_port'), + self.msg_list[self.msg_ctr]) + self.msg_ctr += 1 + return inLen + +# Simple block to consume messages +class message_consumer(gr.sync_block): + def __init__(self): + gr.sync_block.__init__( + self, + name = "message consumer", + in_sig = None, + out_sig = None + ) + self.msg_list = [] + self.message_port_register_in(pmt.pmt_intern('in_port')) + self.set_msg_handler(pmt.pmt_intern('in_port'), + self.handle_msg) + + def handle_msg(self, msg): + # Create a new PMT from long value and put in list + self.msg_list.append(pmt.pmt_from_long(pmt.pmt_to_long(msg))) + +class test_python_message_passing(gr_unittest.TestCase): + + def setUp(self): + self.tb = gr.top_block() + + def tearDown(self): + self.tb = None + + def test_000(self): + num_msgs = 10 + msg_interval = 1000 + msg_list = [] + for i in range(num_msgs): + msg_list.append(pmt.pmt_from_long(i)) + + # Create vector source with dummy data to trigger messages + src_data = [] + for i in range(num_msgs*msg_interval): + src_data.append(float(i)) + src = gr.vector_source_f(src_data, False) + msg_gen = message_generator(msg_list, msg_interval) + msg_cons = message_consumer() + + # Connect vector source to message gen + self.tb.connect(src, msg_gen) + + # Connect message generator to message consumer + self.tb.msg_connect(msg_gen, 'out_port', msg_cons, 'in_port') + + # Verify that the messgae port query functions work + self.assertEqual(pmt.pmt_symbol_to_string(pmt.pmt_vector_ref( + msg_gen.message_ports_out(), 0)), 'out_port') + self.assertEqual(pmt.pmt_symbol_to_string(pmt.pmt_vector_ref( + msg_cons.message_ports_in(), 0)), 'in_port') + + # Run to verify message passing + self.tb.start() + + # Wait for all messages to be sent + while msg_gen.msg_ctr < num_msgs: + time.sleep(0.5) + self.tb.stop() + self.tb.wait() + + # Verify that the message consumer got all the messages + self.assertEqual(num_msgs, len(msg_cons.msg_list)) + for i in range(num_msgs): + self.assertTrue(pmt.pmt_equal(msg_list[i], msg_cons.msg_list[i])) + +if __name__ == '__main__': + gr_unittest.run(test_python_message_passing, + 'test_python_message_passing.xml') |