summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/runtime
diff options
context:
space:
mode:
authorTom Rondeau <trondeau@vt.edu>2013-02-13 13:34:58 -0500
committerTom Rondeau <trondeau@vt.edu>2013-02-13 13:34:58 -0500
commitda8abe401ae3859b07c48fb2a5dab4717b6c15cc (patch)
tree357eef8306745eabbb7a41b8d81ffe8db7ff7e6e /gnuradio-core/src/lib/runtime
parenta482bd9a29ba643ff148ae392dc359ff53dd7fb9 (diff)
parentaf7d55fda43746ae187bc520952eacf420f8363f (diff)
Merge branch 'next' of gnuradio.org:gnuradio into next
Diffstat (limited to 'gnuradio-core/src/lib/runtime')
-rw-r--r--gnuradio-core/src/lib/runtime/gr_basic_block.h25
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.cc5
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block.h20
-rw-r--r--gnuradio-core/src/lib/runtime/gr_block_executor.cc29
4 files changed, 49 insertions, 30 deletions
diff --git a/gnuradio-core/src/lib/runtime/gr_basic_block.h b/gnuradio-core/src/lib/runtime/gr_basic_block.h
index d4f5cb5941..de56954c54 100644
--- a/gnuradio-core/src/lib/runtime/gr_basic_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_basic_block.h
@@ -58,18 +58,6 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
typedef boost::function<void(pmt::pmt_t)> msg_handler_t;
private:
- /*
- * This function is called by the runtime system to dispatch messages.
- *
- * The thread-safety guarantees mentioned in set_msg_handler are implemented
- * by the callers of this method.
- */
- void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
- {
- // AA Update this
- if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
- d_msg_handlers[which_port](msg); // Yes, invoke it.
- };
//msg_handler_t d_msg_handler;
typedef std::map<pmt::pmt_t , msg_handler_t, pmt::comperator> d_msg_handlers_t;
@@ -124,6 +112,19 @@ class GR_CORE_API gr_basic_block : public gr_msg_accepter, public boost::enable_
*/
void set_color(vcolor color) { d_color = color; }
vcolor color() const { return d_color; }
+
+ /*
+ * This function is called by the runtime system to dispatch messages.
+ *
+ * The thread-safety guarantees mentioned in set_msg_handler are implemented
+ * by the callers of this method.
+ */
+ virtual void dispatch_msg(pmt::pmt_t which_port, pmt::pmt_t msg)
+ {
+ // AA Update this
+ if (d_msg_handlers.find(which_port) != d_msg_handlers.end()) // Is there a handler?
+ d_msg_handlers[which_port](msg); // Yes, invoke it.
+ };
// Message passing interface
pmt::pmt_t message_subscribers;
diff --git a/gnuradio-core/src/lib/runtime/gr_block.cc b/gnuradio-core/src/lib/runtime/gr_block.cc
index 33c460e1a0..83bbea37e6 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block.cc
@@ -31,8 +31,8 @@
#include <gr_block_registry.h>
gr_block::gr_block (const std::string &name,
- gr_io_signature_sptr input_signature,
- gr_io_signature_sptr output_signature)
+ gr_io_signature_sptr input_signature,
+ gr_io_signature_sptr output_signature)
: gr_basic_block(name, input_signature, output_signature),
d_output_multiple (1),
d_output_multiple_set(false),
@@ -43,6 +43,7 @@ gr_block::gr_block (const std::string &name,
d_fixed_rate(false),
d_max_noutput_items_set(false),
d_max_noutput_items(0),
+ d_min_noutput_items(0),
d_tag_propagation_policy(TPP_ALL_TO_ALL),
d_pc_rpc_set(false),
d_max_output_buffer(std::max(output_signature->max_streams(),1), -1),
diff --git a/gnuradio-core/src/lib/runtime/gr_block.h b/gnuradio-core/src/lib/runtime/gr_block.h
index 96e07439ef..ee69309169 100644
--- a/gnuradio-core/src/lib/runtime/gr_block.h
+++ b/gnuradio-core/src/lib/runtime/gr_block.h
@@ -252,13 +252,30 @@ class GR_CORE_API gr_block : public gr_basic_block {
void set_tag_propagation_policy(tag_propagation_policy_t p);
/*!
+ * \brief Return the minimum number of output items this block can
+ * produce during a call to work.
+ *
+ * Should be 0 for most blocks. Useful if we're dealing with packets and
+ * the block produces one packet per call to work.
+ */
+ int min_noutput_items() const { return d_min_noutput_items; }
+
+ /*!
+ * \brief Set the minimum number of output items this block can
+ * produce during a call to work.
+ *
+ * \param m the minimum noutput_items this block can produce.
+ */
+ void set_min_noutput_items(int m) { d_min_noutput_items = m; }
+
+ /*!
* \brief Return the maximum number of output items this block will
* handle during a call to work.
*/
int max_noutput_items();
/*!
- * \brief Set the maximum number of ouput items htis block will
+ * \brief Set the maximum number of output items this block will
* handle during a call to work.
*
* \param m the maximum noutput_items this block will handle.
@@ -446,6 +463,7 @@ class GR_CORE_API gr_block : public gr_basic_block {
gr_block_detail_sptr d_detail; // implementation details
unsigned d_history;
bool d_fixed_rate;
+ int d_min_noutput_items;
bool d_max_noutput_items_set; // if d_max_noutput_items is valid
int d_max_noutput_items; // value of max_noutput_items for this block
tag_propagation_policy_t d_tag_propagation_policy; // policy for moving tags downstream
diff --git a/gnuradio-core/src/lib/runtime/gr_block_executor.cc b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
index ee0ab9e378..27f591452d 100644
--- a/gnuradio-core/src/lib/runtime/gr_block_executor.cc
+++ b/gnuradio-core/src/lib/runtime/gr_block_executor.cc
@@ -64,22 +64,21 @@ round_down (unsigned int n, unsigned int multiple)
// on is done.
//
static int
-min_available_space (gr_block_detail *d, int output_multiple)
+min_available_space (gr_block_detail *d, int output_multiple, int min_noutput_items)
{
- int min_space = std::numeric_limits<int>::max();
-
+ int min_space = std::numeric_limits<int>::max();
+ if (min_noutput_items == 0)
+ min_noutput_items = 1;
for (int i = 0; i < d->noutputs (); i++){
gruel::scoped_lock guard(*d->output(i)->mutex());
-#if 0
- int n = round_down(d->output(i)->space_available(), output_multiple);
-#else
- int n = round_down(std::min(d->output(i)->space_available(),
- d->output(i)->bufsize()/2),
- output_multiple);
-#endif
- if (n == 0){ // We're blocked on output.
- if (d->output(i)->done()){ // Downstream is done, therefore we're done.
- return -1;
+ int avail_n = round_down(d->output(i)->space_available(), output_multiple);
+ int best_n = round_down(d->output(i)->bufsize()/2, output_multiple);
+ if (best_n < min_noutput_items)
+ throw std::runtime_error("Buffer too small for min_noutput_items");
+ int n = std::min(avail_n, best_n);
+ if (n < min_noutput_items){ // We're blocked on output.
+ if (d->output(i)->done()){ // Downstream is done, therefore we're done.
+ return -1;
}
return 0;
}
@@ -205,7 +204,7 @@ gr_block_executor::run_one_iteration()
d_start_nitems_read.resize(0);
// determine the minimum available output space
- noutput_items = min_available_space (d, m->output_multiple ());
+ noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ());
noutput_items = std::min(noutput_items, max_noutput_items);
LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
if (noutput_items == -1) // we're done
@@ -286,7 +285,7 @@ gr_block_executor::run_one_iteration()
}
// determine the minimum available output space
- noutput_items = min_available_space (d, m->output_multiple ());
+ noutput_items = min_available_space (d, m->output_multiple (), m->min_noutput_items ());
if (ENABLE_LOGGING){
*d_log << " regular ";
if (m->relative_rate() >= 1.0)