diff options
Diffstat (limited to 'gnuradio-core/src/lib')
-rw-r--r-- | gnuradio-core/src/lib/general/gr_block_gateway.h | 52 | ||||
-rw-r--r-- | gnuradio-core/src/lib/general/gr_feval.cc | 16 | ||||
-rw-r--r-- | gnuradio-core/src/lib/general/gr_feval.h | 29 | ||||
-rw-r--r-- | gnuradio-core/src/lib/general/gr_feval.i | 29 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_basic_block.h | 25 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.cc | 5 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block.h | 20 | ||||
-rw-r--r-- | gnuradio-core/src/lib/runtime/gr_block_executor.cc | 29 |
8 files changed, 175 insertions, 30 deletions
diff --git a/gnuradio-core/src/lib/general/gr_block_gateway.h b/gnuradio-core/src/lib/general/gr_block_gateway.h index ae91d41b59..ce87a76c25 100644 --- a/gnuradio-core/src/lib/general/gr_block_gateway.h +++ b/gnuradio-core/src/lib/general/gr_block_gateway.h @@ -188,6 +188,58 @@ public: gr_block::get_tags_in_range(tags, which_input, abs_start, abs_end, key); return tags; } + + /* Message passing interface */ + void gr_block__message_port_register_in(pmt::pmt_t port_id){ + gr_basic_block::message_port_register_in(port_id); + } + + void gr_block__message_port_register_out(pmt::pmt_t port_id){ + gr_basic_block::message_port_register_out(port_id); + } + + void gr_block__message_port_pub(pmt::pmt_t port_id, pmt::pmt_t msg){ + gr_basic_block::message_port_pub(port_id, msg); + } + + void gr_block__message_port_sub(pmt::pmt_t port_id, pmt::pmt_t target){ + gr_basic_block::message_port_sub(port_id, target); + } + + void gr_block__message_port_unsub(pmt::pmt_t port_id, pmt::pmt_t target){ + gr_basic_block::message_port_unsub(port_id, target); + } + + pmt::pmt_t gr_block__message_ports_in(){ + return gr_basic_block::message_ports_in(); + } + + pmt::pmt_t gr_block__message_ports_out(){ + return gr_basic_block::message_ports_out(); + } + + void set_msg_handler_feval(pmt::pmt_t which_port, gr_feval_p *msg_handler) + { + if(msg_queue.find(which_port) == msg_queue.end()){ + throw std::runtime_error("attempt to set_msg_handler_feval() on bad input message port!"); + } + d_msg_handlers_feval[which_port] = msg_handler; + } + +protected: + typedef std::map<pmt::pmt_t, gr_feval_p *, pmt::comperator> msg_handlers_feval_t; + msg_handlers_feval_t d_msg_handlers_feval; + + void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg){ + // Is there a handler? + if (d_msg_handlers_feval.find(which_port) != d_msg_handlers_feval.end()){ + d_msg_handlers_feval[which_port]->calleval(msg); // Yes, invoke it. + } + else { + // Pass to generic dispatcher if not found + gr_basic_block::dispatch_msg(which_port, msg); + } + } }; /*! diff --git a/gnuradio-core/src/lib/general/gr_feval.cc b/gnuradio-core/src/lib/general/gr_feval.cc index ca5714a796..89f09984cf 100644 --- a/gnuradio-core/src/lib/general/gr_feval.cc +++ b/gnuradio-core/src/lib/general/gr_feval.cc @@ -88,6 +88,22 @@ gr_feval::calleval(void) eval(); } +// ---------------------------------------------------------------- + +gr_feval_p::~gr_feval_p(){} + +void +gr_feval_p::eval(pmt::pmt_t x) +{ + // nop +} + +void +gr_feval_p::calleval(pmt::pmt_t x) +{ + eval(x); +} + /* * Trivial examples showing C++ (transparently) calling Python */ diff --git a/gnuradio-core/src/lib/general/gr_feval.h b/gnuradio-core/src/lib/general/gr_feval.h index 1726a8a7f9..a9bccfe51c 100644 --- a/gnuradio-core/src/lib/general/gr_feval.h +++ b/gnuradio-core/src/lib/general/gr_feval.h @@ -24,6 +24,7 @@ #include <gr_core_api.h> #include <gr_complex.h> +#include <gruel/pmt.h> /*! * \brief base class for evaluating a function: double -> double @@ -138,6 +139,34 @@ public: }; /*! + * \brief base class for evaluating a function: pmt -> void + * \ingroup misc + * + * This class is designed to be subclassed in Python or C++ + * and is callable from both places. It uses SWIG's + * "director" feature to implement the magic. + * It's slow. Don't use it in a performance critical path. + * + * Override eval to define the behavior. + * Use calleval to invoke eval (this kludge is required to allow a + * python specific "shim" to be inserted. + */ +class GR_CORE_API gr_feval_p +{ +protected: + /*! + * \brief override this to define the function + */ + virtual void eval(pmt::pmt_t x); + +public: + gr_feval_p() {} + virtual ~gr_feval_p(); + + virtual void calleval(pmt::pmt_t x); // invoke "eval" +}; + +/*! * \brief trivial examples / test cases showing C++ calling Python code */ GR_CORE_API double gr_feval_dd_example(gr_feval_dd *f, double x); diff --git a/gnuradio-core/src/lib/general/gr_feval.i b/gnuradio-core/src/lib/general/gr_feval.i index bc219a6431..bcf4f1e646 100644 --- a/gnuradio-core/src/lib/general/gr_feval.i +++ b/gnuradio-core/src/lib/general/gr_feval.i @@ -45,23 +45,28 @@ // Directors are only supported in Python, Java and C# #ifdef SWIGPYTHON +%include "pmt_swig.i" +using namespace pmt; // Enable SWIG directors for these classes %feature("director") gr_py_feval_dd; %feature("director") gr_py_feval_cc; %feature("director") gr_py_feval_ll; %feature("director") gr_py_feval; +%feature("director") gr_py_feval_p; %feature("nodirector") gr_py_feval_dd::calleval; %feature("nodirector") gr_py_feval_cc::calleval; %feature("nodirector") gr_py_feval_ll::calleval; %feature("nodirector") gr_py_feval::calleval; +%feature("nodirector") gr_py_feval_p::calleval; %rename(feval_dd) gr_py_feval_dd; %rename(feval_cc) gr_py_feval_cc; %rename(feval_ll) gr_py_feval_ll; %rename(feval) gr_py_feval; +%rename(feval_p) gr_py_feval_p; //%exception { // try { $action } @@ -136,12 +141,26 @@ public: virtual void calleval(); }; +%ignore gr_feval_p; +class gr_feval_p +{ +protected: + virtual void eval(pmt_t x); + +public: + gr_feval_p() {} + virtual ~gr_feval_p(); + + virtual void calleval(pmt_t x); +}; + /* * These are the ones to derive from in Python. They have the magic shim * that ensures that we're holding the Python GIL when we enter Python land... */ %inline %{ +#include <gruel/pmt.h> class gr_py_feval_dd : public gr_feval_dd { @@ -183,6 +202,16 @@ class gr_py_feval : public gr_feval } }; +class gr_py_feval_p : public gr_feval_p +{ + public: + void calleval(pmt::pmt_t x) + { + ensure_py_gil_state _lock; + eval(x); + } +}; + %} diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h index d4f5cb5941..de56954c54 100644 --- a/gnuradio-core/src/lib/runtime/gr_basic_block.h +++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h @@ -58,18 +58,6 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_ typedef boost::function<void(pmt::pmt_t)> msg_handler_t; private: - /* - * This function is called by the runtime system to dispatch messages. - * - * The thread-safety guarantees mentioned in set_msg_handler are implemented - * by the callers of this method. - */ - void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) - { - // AA Update this - if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? - d_msg_handlers[which_port](msg); // Yes, invoke it. - }; //msg_handler_t d_msg_handler; typedef std::map<pmt::pmt_t , msg_handler_t, pmt::comperator> d_msg_handlers_t; @@ -124,6 +112,19 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_ */ void set_color(vcolor color) { d_color = color; } vcolor color() const { return d_color; } + + /* + * This function is called by the runtime system to dispatch messages. + * + * The thread-safety guarantees mentioned in set_msg_handler are implemented + * by the callers of this method. + */ + virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg) + { + // AA Update this + if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler? + d_msg_handlers[which_port](msg); // Yes, invoke it. + }; // Message passing interface pmt::pmt_t message_subscribers; diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc index 33c460e1a0..83bbea37e6 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.cc +++ b/gnuradio-core/src/lib/runtime/gr_block.cc @@ -31,8 +31,8 @@ #include <gr_block_registry.h> gr_block::gr_block (const std::string &name, - gr_io_signature_sptr input_signature, - gr_io_signature_sptr output_signature) + gr_io_signature_sptr input_signature, + gr_io_signature_sptr output_signature) : gr_basic_block(name, input_signature, output_signature), d_output_multiple (1), d_output_multiple_set(false), @@ -43,6 +43,7 @@ gr_block::gr_block (const std::string &name, d_fixed_rate(false), d_max_noutput_items_set(false), d_max_noutput_items(0), + d_min_noutput_items(0), d_tag_propagation_policy(TPP_ALL_TO_ALL), d_pc_rpc_set(false), d_max_output_buffer(std::max(output_signature->max_streams(),1), -1), diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h index 96e07439ef..ee69309169 100644 --- a/gnuradio-core/src/lib/runtime/gr_block.h +++ b/gnuradio-core/src/lib/runtime/gr_block.h @@ -252,13 +252,30 @@ class GR_CORE_API gr_block : public gr_basic_block { void set_tag_propagation_policy(tag_propagation_policy_t p); /*! + * \brief Return the minimum number of output items this block can + * produce during a call to work. + * + * Should be 0 for most blocks. Useful if we're dealing with packets and + * the block produces one packet per call to work. + */ + int min_noutput_items() const { return d_min_noutput_items; } + + /*! + * \brief Set the minimum number of output items this block can + * produce during a call to work. + * + * \param m the minimum noutput_items this block can produce. + */ + void set_min_noutput_items(int m) { d_min_noutput_items = m; } + + /*! * \brief Return the maximum number of output items this block will * handle during a call to work. */ int max_noutput_items(); /*! - * \brief Set the maximum number of ouput items htis block will + * \brief Set the maximum number of output items this block will * handle during a call to work. * * \param m the maximum noutput_items this block will handle. @@ -446,6 +463,7 @@ class GR_CORE_API gr_block : public gr_basic_block { gr_block_detail_sptr d_detail; // implementation details unsigned d_history; bool d_fixed_rate; + int d_min_noutput_items; bool d_max_noutput_items_set; // if d_max_noutput_items is valid int d_max_noutput_items; // value of max_noutput_items for this block tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc index ee0ab9e378..27f591452d 100644 --- a/gnuradio-core/src/lib/runtime/gr_block_executor.cc +++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc @@ -64,22 +64,21 @@ round_down (unsigned int n, unsigned int multiple) // on is done. // static int -min_available_space (gr_block_detail *d, int output_multiple) +min_available_space (gr_block_detail *d, int output_multiple, int min_noutput_items) { - int min_space = std::numeric_limits<int>::max(); - + int min_space = std::numeric_limits<int>::max(); + if (min_noutput_items == 0) + min_noutput_items = 1; for (int i = 0; i < d->noutputs (); i++){ gruel::scoped_lock guard(*d->output(i)->mutex()); -#if 0 - int n = round_down(d->output(i)->space_available(), output_multiple); -#else - int n = round_down(std::min(d->output(i)->space_available(), - d->output(i)->bufsize()/2), - output_multiple); -#endif - if (n == 0){ // We're blocked on output. - if (d->output(i)->done()){ // Downstream is done, therefore we're done. - return -1; + int avail_n = round_down(d->output(i)->space_available(), output_multiple); + int best_n = round_down(d->output(i)->bufsize()/2, output_multiple); + if (best_n < min_noutput_items) + throw std::runtime_error("Buffer too small for min_noutput_items"); + int n = std::min(avail_n, best_n); + if (n < min_noutput_items){ // We're blocked on output. + if (d->output(i)->done()){ // Downstream is done, therefore we're done. + return -1; } return 0; } @@ -205,7 +204,7 @@ gr_block_executor::run_one_iteration() d_start_nitems_read.resize(0); // determine the minimum available output space - noutput_items = min_available_space (d, m->output_multiple ()); + noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ()); noutput_items = std::min(noutput_items, max_noutput_items); LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl); if (noutput_items == -1) // we're done @@ -286,7 +285,7 @@ gr_block_executor::run_one_iteration() } // determine the minimum available output space - noutput_items = min_available_space (d, m->output_multiple ()); + noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ()); if (ENABLE_LOGGING){ *d_log << " regular "; if (m->relative_rate() >= 1.0) |