summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/block_executor.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-runtime/lib/block_executor.cc')
-rw-r--r--gnuradio-runtime/lib/block_executor.cc870
1 files changed, 442 insertions, 428 deletions
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc
index 40521e10b5..9f0d7ded5e 100644
--- a/gnuradio-runtime/lib/block_executor.cc
+++ b/gnuradio-runtime/lib/block_executor.cc
@@ -42,529 +42,543 @@ namespace gr {
#define ENABLE_LOGGING 0
#if (ENABLE_LOGGING)
-#define LOG(x) do { x; } while(0)
+#define LOG(x) \
+ do { \
+ x; \
+ } while (0)
#else
-#define LOG(x) do {;} while(0)
+#define LOG(x) \
+ do { \
+ ; \
+ } while (0)
#endif
- static int which_scheduler = 0;
+static int which_scheduler = 0;
- inline static unsigned int
- round_up(unsigned int n, unsigned int multiple)
- {
+inline static unsigned int round_up(unsigned int n, unsigned int multiple)
+{
return ((n + multiple - 1) / multiple) * multiple;
- }
+}
- inline static unsigned int
- round_down(unsigned int n, unsigned int multiple)
- {
+inline static unsigned int round_down(unsigned int n, unsigned int multiple)
+{
return (n / multiple) * multiple;
- }
-
- //
- // Return minimum available write space in all our downstream
- // buffers or -1 if we're output blocked and the output we're
- // blocked on is done.
- //
- static int
- min_available_space(block_detail *d, int output_multiple, int min_noutput_items)
- {
+}
+
+//
+// Return minimum available write space in all our downstream
+// buffers or -1 if we're output blocked and the output we're
+// blocked on is done.
+//
+static int
+min_available_space(block_detail* d, int output_multiple, int min_noutput_items)
+{
int min_space = std::numeric_limits<int>::max();
- if(min_noutput_items == 0)
- min_noutput_items = 1;
- for(int i = 0; i < d->noutputs (); i++) {
- buffer_sptr out_buf = d->output(i);
- gr::thread::scoped_lock guard(*out_buf->mutex());
- int avail_n = round_down(out_buf->space_available(), output_multiple);
- int best_n = round_down(out_buf->bufsize()/2, output_multiple);
- if(best_n < min_noutput_items)
- throw std::runtime_error("Buffer too small for min_noutput_items");
- int n = std::min(avail_n, best_n);
- if(n < min_noutput_items){ // We're blocked on output.
- if(out_buf->done()){ // Downstream is done, therefore we're done.
- return -1;
+ if (min_noutput_items == 0)
+ min_noutput_items = 1;
+ for (int i = 0; i < d->noutputs(); i++) {
+ buffer_sptr out_buf = d->output(i);
+ gr::thread::scoped_lock guard(*out_buf->mutex());
+ int avail_n = round_down(out_buf->space_available(), output_multiple);
+ int best_n = round_down(out_buf->bufsize() / 2, output_multiple);
+ if (best_n < min_noutput_items)
+ throw std::runtime_error("Buffer too small for min_noutput_items");
+ int n = std::min(avail_n, best_n);
+ if (n < min_noutput_items) { // We're blocked on output.
+ if (out_buf->done()) { // Downstream is done, therefore we're done.
+ return -1;
+ }
+ return 0;
}
- return 0;
- }
- min_space = std::min(min_space, n);
+ min_space = std::min(min_space, n);
}
return min_space;
- }
-
- static bool
- propagate_tags(block::tag_propagation_policy_t policy, block_detail *d,
- const std::vector<uint64_t> &start_nitems_read, double rrate,
- mpq_class &mp_rrate, bool use_fp_rrate,
- std::vector<tag_t> &rtags, long block_id)
- {
+}
+
+static bool propagate_tags(block::tag_propagation_policy_t policy,
+ block_detail* d,
+ const std::vector<uint64_t>& start_nitems_read,
+ double rrate,
+ mpq_class& mp_rrate,
+ bool use_fp_rrate,
+ std::vector<tag_t>& rtags,
+ long block_id)
+{
static const mpq_class one_half(1, 2);
// Move tags downstream
// if a sink, we don't need to move downstream
- if(d->sink_p()) {
- return true;
+ if (d->sink_p()) {
+ return true;
}
- switch(policy) {
+ switch (policy) {
case block::TPP_DONT:
case block::TPP_CUSTOM:
- return true;
- case block::TPP_ALL_TO_ALL:
- {
- // every tag on every input propagates to everyone downstream
- std::vector<buffer_sptr> out_buf;
-
- for(int i = 0; i < d->ninputs(); i++) {
- d->get_tags_in_range(rtags, i, start_nitems_read[i],
- d->nitems_read(i), block_id);
-
- if (rtags.size() == 0)
- continue;
-
- if (out_buf.size() == 0) {
- out_buf.reserve(d->noutputs());
- for(int o = 0; o < d->noutputs(); o++)
- out_buf.push_back(d->output(o));
- }
+ return true;
+ case block::TPP_ALL_TO_ALL: {
+ // every tag on every input propagates to everyone downstream
+ std::vector<buffer_sptr> out_buf;
+
+ for (int i = 0; i < d->ninputs(); i++) {
+ d->get_tags_in_range(
+ rtags, i, start_nitems_read[i], d->nitems_read(i), block_id);
+
+ if (rtags.size() == 0)
+ continue;
+
+ if (out_buf.size() == 0) {
+ out_buf.reserve(d->noutputs());
+ for (int o = 0; o < d->noutputs(); o++)
+ out_buf.push_back(d->output(o));
+ }
- std::vector<tag_t>::iterator t;
- if(rrate == 1.0) {
- for(t = rtags.begin(); t != rtags.end(); t++) {
- for(int o = 0; o < d->noutputs(); o++)
- out_buf[o]->add_item_tag(*t);
- }
- }
- else if(use_fp_rrate) {
- for(t = rtags.begin(); t != rtags.end(); t++) {
- tag_t new_tag = *t;
- new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
- for(int o = 0; o < d->noutputs(); o++)
- out_buf[o]->add_item_tag(new_tag);
- }
- }
- else {
- mpz_class offset;
- for(t = rtags.begin(); t != rtags.end(); t++) {
- tag_t new_tag = *t;
- mpz_import(offset.get_mpz_t(), 1, 1, sizeof(new_tag.offset), 0, 0, &new_tag.offset);
- offset = offset * mp_rrate + one_half;
- new_tag.offset = offset.get_ui();
- for(int o = 0; o < d->noutputs(); o++)
- out_buf[o]->add_item_tag(new_tag);
- }
+ std::vector<tag_t>::iterator t;
+ if (rrate == 1.0) {
+ for (t = rtags.begin(); t != rtags.end(); t++) {
+ for (int o = 0; o < d->noutputs(); o++)
+ out_buf[o]->add_item_tag(*t);
+ }
+ } else if (use_fp_rrate) {
+ for (t = rtags.begin(); t != rtags.end(); t++) {
+ tag_t new_tag = *t;
+ new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
+ for (int o = 0; o < d->noutputs(); o++)
+ out_buf[o]->add_item_tag(new_tag);
+ }
+ } else {
+ mpz_class offset;
+ for (t = rtags.begin(); t != rtags.end(); t++) {
+ tag_t new_tag = *t;
+ mpz_import(offset.get_mpz_t(),
+ 1,
+ 1,
+ sizeof(new_tag.offset),
+ 0,
+ 0,
+ &new_tag.offset);
+ offset = offset * mp_rrate + one_half;
+ new_tag.offset = offset.get_ui();
+ for (int o = 0; o < d->noutputs(); o++)
+ out_buf[o]->add_item_tag(new_tag);
+ }
+ }
}
- }
- }
- break;
+ } break;
case block::TPP_ONE_TO_ONE:
- // tags from input i only go to output i
- // this requires d->ninputs() == d->noutputs; this is checked when this
- // type of tag-propagation system is selected in block_detail
- if(d->ninputs() == d->noutputs()) {
- buffer_sptr out_buf;
-
- for(int i = 0; i < d->ninputs(); i++) {
- d->get_tags_in_range(rtags, i, start_nitems_read[i],
- d->nitems_read(i), block_id);
-
- if (rtags.size() == 0)
- continue;
-
- out_buf = d->output(i);
-
- std::vector<tag_t>::iterator t;
- if(rrate == 1.0) {
- for(t = rtags.begin(); t != rtags.end(); t++) {
- out_buf->add_item_tag(*t);
- }
- }
- else if(use_fp_rrate) {
- for(t = rtags.begin(); t != rtags.end(); t++) {
- tag_t new_tag = *t;
- new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
- out_buf->add_item_tag(new_tag);
+ // tags from input i only go to output i
+ // this requires d->ninputs() == d->noutputs; this is checked when this
+ // type of tag-propagation system is selected in block_detail
+ if (d->ninputs() == d->noutputs()) {
+ buffer_sptr out_buf;
+
+ for (int i = 0; i < d->ninputs(); i++) {
+ d->get_tags_in_range(
+ rtags, i, start_nitems_read[i], d->nitems_read(i), block_id);
+
+ if (rtags.size() == 0)
+ continue;
+
+ out_buf = d->output(i);
+
+ std::vector<tag_t>::iterator t;
+ if (rrate == 1.0) {
+ for (t = rtags.begin(); t != rtags.end(); t++) {
+ out_buf->add_item_tag(*t);
+ }
+ } else if (use_fp_rrate) {
+ for (t = rtags.begin(); t != rtags.end(); t++) {
+ tag_t new_tag = *t;
+ new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
+ out_buf->add_item_tag(new_tag);
+ }
+ } else {
+ mpz_class offset;
+ for (t = rtags.begin(); t != rtags.end(); t++) {
+ tag_t new_tag = *t;
+ mpz_import(offset.get_mpz_t(),
+ 1,
+ 1,
+ sizeof(new_tag.offset),
+ 0,
+ 0,
+ &new_tag.offset);
+ offset = offset * mp_rrate + one_half;
+ new_tag.offset = offset.get_ui();
+ out_buf->add_item_tag(new_tag);
+ }
+ }
}
- }
- else {
- mpz_class offset;
- for(t = rtags.begin(); t != rtags.end(); t++) {
- tag_t new_tag = *t;
- mpz_import(offset.get_mpz_t(), 1, 1, sizeof(new_tag.offset), 0, 0, &new_tag.offset);
- offset = offset * mp_rrate + one_half;
- new_tag.offset = offset.get_ui();
- out_buf->add_item_tag(new_tag);
- }
- }
+ } else {
+ std::cerr << "Error: block_executor: propagation_policy 'ONE-TO-ONE' "
+ "requires ninputs == noutputs"
+ << std::endl;
+ return false;
}
- }
- else {
- std::cerr << "Error: block_executor: propagation_policy 'ONE-TO-ONE' requires ninputs == noutputs" << std::endl;
- return false;
- }
- break;
+ break;
default:
- return true;
+ return true;
}
return true;
- }
+}
- block_executor::block_executor(block_sptr block, int max_noutput_items)
+block_executor::block_executor(block_sptr block, int max_noutput_items)
: d_block(block), d_log(0), d_max_noutput_items(max_noutput_items)
- {
- if(ENABLE_LOGGING) {
- std::string name = str(boost::format("sst-%03d.log") % which_scheduler++);
- d_log = new std::ofstream(name.c_str());
- std::unitbuf(*d_log); // make it unbuffered...
- *d_log << "block_executor: "
- << d_block << std::endl;
+{
+ if (ENABLE_LOGGING) {
+ std::string name = str(boost::format("sst-%03d.log") % which_scheduler++);
+ d_log = new std::ofstream(name.c_str());
+ std::unitbuf(*d_log); // make it unbuffered...
+ *d_log << "block_executor: " << d_block << std::endl;
}
#ifdef GR_PERFORMANCE_COUNTERS
- prefs *prefs = prefs::singleton();
+ prefs* prefs = prefs::singleton();
d_use_pc = prefs->get_bool("PerfCounters", "on", false);
#endif /* GR_PERFORMANCE_COUNTERS */
- d_block->start(); // enable any drivers, etc.
- }
+ d_block->start(); // enable any drivers, etc.
+}
- block_executor::~block_executor()
- {
- if(ENABLE_LOGGING)
- delete d_log;
+block_executor::~block_executor()
+{
+ if (ENABLE_LOGGING)
+ delete d_log;
- d_block->stop(); // stop any drivers, etc.
- }
+ d_block->stop(); // stop any drivers, etc.
+}
- block_executor::state
- block_executor::run_one_iteration()
- {
+block_executor::state block_executor::run_one_iteration()
+{
int noutput_items;
int max_items_avail;
int max_noutput_items;
int new_alignment = 0;
int alignment_state = -1;
- block *m = d_block.get();
- block_detail *d = m->detail().get();
+ block* m = d_block.get();
+ block_detail* d = m->detail().get();
LOG(*d_log << std::endl << m);
max_noutput_items = round_down(d_max_noutput_items, m->output_multiple());
- if(d->done()){
- assert(0);
- return DONE;
+ if (d->done()) {
+ assert(0);
+ return DONE;
}
- if(d->source_p ()) {
- d_ninput_items_required.resize(0);
- d_ninput_items.resize(0);
- d_input_items.resize(0);
- d_input_done.resize(0);
- d_output_items.resize(d->noutputs());
- d_start_nitems_read.resize(0);
-
- // determine the minimum available output space
- noutput_items = min_available_space(d, m->output_multiple (), m->min_noutput_items ());
- noutput_items = std::min(noutput_items, max_noutput_items);
- LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
- if(noutput_items == -1) // we're done
- goto were_done;
-
- if(noutput_items == 0){ // we're output blocked
- LOG(*d_log << " BLKD_OUT\n");
- return BLKD_OUT;
- }
-
- goto setup_call_to_work; // jump to common code
+ if (d->source_p()) {
+ d_ninput_items_required.resize(0);
+ d_ninput_items.resize(0);
+ d_input_items.resize(0);
+ d_input_done.resize(0);
+ d_output_items.resize(d->noutputs());
+ d_start_nitems_read.resize(0);
+
+ // determine the minimum available output space
+ noutput_items =
+ min_available_space(d, m->output_multiple(), m->min_noutput_items());
+ noutput_items = std::min(noutput_items, max_noutput_items);
+ LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
+ if (noutput_items == -1) // we're done
+ goto were_done;
+
+ if (noutput_items == 0) { // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ return BLKD_OUT;
+ }
+
+ goto setup_call_to_work; // jump to common code
}
- else if(d->sink_p ()) {
- d_ninput_items_required.resize(d->ninputs ());
- d_ninput_items.resize(d->ninputs ());
- d_input_items.resize(d->ninputs ());
- d_input_done.resize(d->ninputs());
- d_output_items.resize (0);
- d_start_nitems_read.resize(d->ninputs());
- LOG(*d_log << " sink\n");
-
- max_items_avail = 0;
- for(int i = 0; i < d->ninputs (); i++) {
- {
- /*
- * Acquire the mutex and grab local copies of items_available and done.
- */
- buffer_reader_sptr in_buf = d->input(i);
- gr::thread::scoped_lock guard(*in_buf->mutex());
- d_ninput_items[i] = in_buf->items_available();
- d_input_done[i] = in_buf->done();
- }
+ else if (d->sink_p()) {
+ d_ninput_items_required.resize(d->ninputs());
+ d_ninput_items.resize(d->ninputs());
+ d_input_items.resize(d->ninputs());
+ d_input_done.resize(d->ninputs());
+ d_output_items.resize(0);
+ d_start_nitems_read.resize(d->ninputs());
+ LOG(*d_log << " sink\n");
+
+ max_items_avail = 0;
+ for (int i = 0; i < d->ninputs(); i++) {
+ {
+ /*
+ * Acquire the mutex and grab local copies of items_available and done.
+ */
+ buffer_reader_sptr in_buf = d->input(i);
+ gr::thread::scoped_lock guard(*in_buf->mutex());
+ d_ninput_items[i] = in_buf->items_available();
+ d_input_done[i] = in_buf->done();
+ }
- LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
- LOG(*d_log << " d_input_done[" << i << "] = " << d_input_done[i] << std::endl);
+ LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i]
+ << std::endl);
+ LOG(*d_log << " d_input_done[" << i << "] = " << d_input_done[i]
+ << std::endl);
- if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
- goto were_done;
+ if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
+ goto were_done;
- max_items_avail = std::max (max_items_avail, d_ninput_items[i]);
- }
+ max_items_avail = std::max(max_items_avail, d_ninput_items[i]);
+ }
- // take a swag at how much output we can sink
- noutput_items = (int)(max_items_avail * m->relative_rate ());
- noutput_items = round_down(noutput_items, m->output_multiple ());
- noutput_items = std::min(noutput_items, max_noutput_items);
- LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
- LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
+ // take a swag at how much output we can sink
+ noutput_items = (int)(max_items_avail * m->relative_rate());
+ noutput_items = round_down(noutput_items, m->output_multiple());
+ noutput_items = std::min(noutput_items, max_noutput_items);
+ LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
+ LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
- if(noutput_items == 0) { // we're blocked on input
- LOG(*d_log << " BLKD_IN\n");
- return BLKD_IN;
- }
+ if (noutput_items == 0) { // we're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ return BLKD_IN;
+ }
- goto try_again; // Jump to code shared with regular case.
+ goto try_again; // Jump to code shared with regular case.
}
else {
- // do the regular thing
- d_ninput_items_required.resize (d->ninputs ());
- d_ninput_items.resize (d->ninputs ());
- d_input_items.resize (d->ninputs ());
- d_input_done.resize(d->ninputs());
- d_output_items.resize (d->noutputs ());
- d_start_nitems_read.resize(d->ninputs());
-
- max_items_avail = 0;
- for(int i = 0; i < d->ninputs (); i++) {
- {
- /*
- * Acquire the mutex and grab local copies of items_available and done.
- */
- buffer_reader_sptr in_buf = d->input(i);
- gr::thread::scoped_lock guard(*in_buf->mutex());
- d_ninput_items[i] = in_buf->items_available ();
- d_input_done[i] = in_buf->done();
+ // do the regular thing
+ d_ninput_items_required.resize(d->ninputs());
+ d_ninput_items.resize(d->ninputs());
+ d_input_items.resize(d->ninputs());
+ d_input_done.resize(d->ninputs());
+ d_output_items.resize(d->noutputs());
+ d_start_nitems_read.resize(d->ninputs());
+
+ max_items_avail = 0;
+ for (int i = 0; i < d->ninputs(); i++) {
+ {
+ /*
+ * Acquire the mutex and grab local copies of items_available and done.
+ */
+ buffer_reader_sptr in_buf = d->input(i);
+ gr::thread::scoped_lock guard(*in_buf->mutex());
+ d_ninput_items[i] = in_buf->items_available();
+ d_input_done[i] = in_buf->done();
+ }
+ max_items_avail = std::max(max_items_avail, d_ninput_items[i]);
}
- max_items_avail = std::max(max_items_avail, d_ninput_items[i]);
- }
-
- // determine the minimum available output space
- noutput_items = min_available_space(d, m->output_multiple(), m->min_noutput_items());
- if(ENABLE_LOGGING) {
- *d_log << " regular ";
- *d_log << m->relative_rate_i() << ":"
- << m->relative_rate_d() << std::endl;
- *d_log << " max_items_avail = " << max_items_avail << std::endl;
- *d_log << " noutput_items = " << noutput_items << std::endl;
- }
- if(noutput_items == -1) // we're done
- goto were_done;
-
- if(noutput_items == 0) { // we're output blocked
- LOG(*d_log << " BLKD_OUT\n");
- return BLKD_OUT;
- }
- try_again:
- if(m->fixed_rate()) {
- // try to work it forward starting with max_items_avail.
- // We want to try to consume all the input we've got.
- int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
-
- // only test this if we specifically set the output_multiple
- if(m->output_multiple_set())
- reqd_noutput_items = round_down(reqd_noutput_items, m->output_multiple());
-
- if(reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
- noutput_items = reqd_noutput_items;
-
- // if we need this many outputs, overrule the max_noutput_items setting
- max_noutput_items = std::max(m->output_multiple(), max_noutput_items);
- }
- noutput_items = std::min(noutput_items, max_noutput_items);
-
- // Check if we're still unaligned; use up items until we're
- // aligned again. Otherwise, make sure we set the alignment
- // requirement.
- if(!m->output_multiple_set()) {
- if(m->is_unaligned()) {
- // When unaligned, don't just set noutput_items to the remaining
- // samples to meet alignment; this causes too much overhead in
- // requiring a premature call back here. Set the maximum amount
- // of samples to handle unalignment and get us back aligned.
- if(noutput_items >= m->unaligned()) {
- noutput_items = round_up(noutput_items, m->alignment()) \
- - (m->alignment() - m->unaligned());
- new_alignment = 0;
- }
- else {
- new_alignment = m->unaligned() - noutput_items;
- }
- alignment_state = 0;
+ // determine the minimum available output space
+ noutput_items =
+ min_available_space(d, m->output_multiple(), m->min_noutput_items());
+ if (ENABLE_LOGGING) {
+ *d_log << " regular ";
+ *d_log << m->relative_rate_i() << ":" << m->relative_rate_d() << std::endl;
+ *d_log << " max_items_avail = " << max_items_avail << std::endl;
+ *d_log << " noutput_items = " << noutput_items << std::endl;
}
- else if(noutput_items < m->alignment()) {
- // if we don't have enough for an aligned call, keep track of
- // misalignment, set unaligned flag, and proceed.
- new_alignment = m->alignment() - noutput_items;
- m->set_unaligned(new_alignment);
- m->set_is_unaligned(true);
- alignment_state = 1;
+ if (noutput_items == -1) // we're done
+ goto were_done;
+
+ if (noutput_items == 0) { // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ return BLKD_OUT;
}
- else {
- // enough to round down to the nearest alignment and process.
- noutput_items = round_down(noutput_items, m->alignment());
- m->set_is_unaligned(false);
- alignment_state = 2;
+
+ try_again:
+ if (m->fixed_rate()) {
+ // try to work it forward starting with max_items_avail.
+ // We want to try to consume all the input we've got.
+ int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
+
+ // only test this if we specifically set the output_multiple
+ if (m->output_multiple_set())
+ reqd_noutput_items = round_down(reqd_noutput_items, m->output_multiple());
+
+ if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+ noutput_items = reqd_noutput_items;
+
+ // if we need this many outputs, overrule the max_noutput_items setting
+ max_noutput_items = std::max(m->output_multiple(), max_noutput_items);
}
- }
-
- // ask the block how much input they need to produce noutput_items
- m->forecast (noutput_items, d_ninput_items_required);
-
- // See if we've got sufficient input available and make sure we
- // didn't overflow on the input.
- int i;
- for(i = 0; i < d->ninputs (); i++) {
- if(d_ninput_items_required[i] > d_ninput_items[i]) // not enough
- break;
-
- if(d_ninput_items_required[i] < 0) {
- std::cerr << "\nsched: <block " << m->name()
- << " (" << m->unique_id() << ")>"
- << " thinks its ninput_items required is "
- << d_ninput_items_required[i]
- << " and cannot be negative.\n"
- << "Some parameterization is wrong. "
- << "Too large a decimation value?\n\n";
- goto were_done;
+ noutput_items = std::min(noutput_items, max_noutput_items);
+
+ // Check if we're still unaligned; use up items until we're
+ // aligned again. Otherwise, make sure we set the alignment
+ // requirement.
+ if (!m->output_multiple_set()) {
+ if (m->is_unaligned()) {
+ // When unaligned, don't just set noutput_items to the remaining
+ // samples to meet alignment; this causes too much overhead in
+ // requiring a premature call back here. Set the maximum amount
+ // of samples to handle unalignment and get us back aligned.
+ if (noutput_items >= m->unaligned()) {
+ noutput_items = round_up(noutput_items, m->alignment()) -
+ (m->alignment() - m->unaligned());
+ new_alignment = 0;
+ } else {
+ new_alignment = m->unaligned() - noutput_items;
+ }
+ alignment_state = 0;
+ } else if (noutput_items < m->alignment()) {
+ // if we don't have enough for an aligned call, keep track of
+ // misalignment, set unaligned flag, and proceed.
+ new_alignment = m->alignment() - noutput_items;
+ m->set_unaligned(new_alignment);
+ m->set_is_unaligned(true);
+ alignment_state = 1;
+ } else {
+ // enough to round down to the nearest alignment and process.
+ noutput_items = round_down(noutput_items, m->alignment());
+ m->set_is_unaligned(false);
+ alignment_state = 2;
+ }
}
- }
-
- if(i < d->ninputs()) { // not enough input on input[i]
- // if we can, try reducing the size of our output request
- if(noutput_items > m->output_multiple()) {
- noutput_items /= 2;
- noutput_items = round_up(noutput_items, m->output_multiple());
- goto try_again;
+
+ // ask the block how much input they need to produce noutput_items
+ m->forecast(noutput_items, d_ninput_items_required);
+
+ // See if we've got sufficient input available and make sure we
+ // didn't overflow on the input.
+ int i;
+ for (i = 0; i < d->ninputs(); i++) {
+ if (d_ninput_items_required[i] > d_ninput_items[i]) // not enough
+ break;
+
+ if (d_ninput_items_required[i] < 0) {
+ std::cerr << "\nsched: <block " << m->name() << " (" << m->unique_id()
+ << ")>"
+ << " thinks its ninput_items required is "
+ << d_ninput_items_required[i] << " and cannot be negative.\n"
+ << "Some parameterization is wrong. "
+ << "Too large a decimation value?\n\n";
+ goto were_done;
+ }
}
- // We're blocked on input
- LOG(*d_log << " BLKD_IN\n");
- if(d_input_done[i]) // If the upstream block is done, we're done
- goto were_done;
+ if (i < d->ninputs()) { // not enough input on input[i]
+ // if we can, try reducing the size of our output request
+ if (noutput_items > m->output_multiple()) {
+ noutput_items /= 2;
+ noutput_items = round_up(noutput_items, m->output_multiple());
+ goto try_again;
+ }
- // Is it possible to ever fulfill this request?
- buffer_reader_sptr in_buf = d->input(i);
- if(d_ninput_items_required[i] > in_buf->max_possible_items_available()) {
- // Nope, never going to happen...
- std::cerr << "\nsched: <block " << m->name()
- << " (" << m->unique_id() << ")>"
+ // We're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ if (d_input_done[i]) // If the upstream block is done, we're done
+ goto were_done;
+
+ // Is it possible to ever fulfill this request?
+ buffer_reader_sptr in_buf = d->input(i);
+ if (d_ninput_items_required[i] > in_buf->max_possible_items_available()) {
+ // Nope, never going to happen...
+ std::cerr
+ << "\nsched: <block " << m->name() << " (" << m->unique_id() << ")>"
<< " is requesting more input data\n"
<< " than we can provide.\n"
- << " ninput_items_required = "
- << d_ninput_items_required[i] << "\n"
+ << " ninput_items_required = " << d_ninput_items_required[i] << "\n"
<< " max_possible_items_available = "
<< in_buf->max_possible_items_available() << "\n"
<< " If this is a filter, consider reducing the number of taps.\n";
- goto were_done;
- }
+ goto were_done;
+ }
- // If we were made unaligned in this round but return here without
- // processing; reset the unalignment claim before next entry.
- if(alignment_state == 1) {
- m->set_unaligned(0);
- m->set_is_unaligned(false);
+ // If we were made unaligned in this round but return here without
+ // processing; reset the unalignment claim before next entry.
+ if (alignment_state == 1) {
+ m->set_unaligned(0);
+ m->set_is_unaligned(false);
+ }
+ return BLKD_IN;
}
- return BLKD_IN;
- }
- // We've got enough data on each input to produce noutput_items.
- // Finish setting up the call to work.
- for(int i = 0; i < d->ninputs (); i++)
- d_input_items[i] = d->input(i)->read_pointer();
+ // We've got enough data on each input to produce noutput_items.
+ // Finish setting up the call to work.
+ for (int i = 0; i < d->ninputs(); i++)
+ d_input_items[i] = d->input(i)->read_pointer();
setup_call_to_work:
- d->d_produce_or = 0;
- for(int i = 0; i < d->noutputs (); i++)
- d_output_items[i] = d->output(i)->write_pointer();
+ d->d_produce_or = 0;
+ for (int i = 0; i < d->noutputs(); i++)
+ d_output_items[i] = d->output(i)->write_pointer();
- // determine where to start looking for new tags
- for(int i = 0; i < d->ninputs(); i++)
- d_start_nitems_read[i] = d->nitems_read(i);
+ // determine where to start looking for new tags
+ for (int i = 0; i < d->ninputs(); i++)
+ d_start_nitems_read[i] = d->nitems_read(i);
#ifdef GR_PERFORMANCE_COUNTERS
- if(d_use_pc)
- d->start_perf_counters();
+ if (d_use_pc)
+ d->start_perf_counters();
#endif /* GR_PERFORMANCE_COUNTERS */
- // Do the actual work of the block
- int n = m->general_work(noutput_items, d_ninput_items,
- d_input_items, d_output_items);
+ // Do the actual work of the block
+ int n =
+ m->general_work(noutput_items, d_ninput_items, d_input_items, d_output_items);
#ifdef GR_PERFORMANCE_COUNTERS
- if(d_use_pc)
- d->stop_perf_counters(noutput_items, n);
+ if (d_use_pc)
+ d->stop_perf_counters(noutput_items, n);
#endif /* GR_PERFORMANCE_COUNTERS */
- LOG(*d_log << " general_work: noutput_items = " << noutput_items
- << " result = " << n << std::endl);
-
- // Adjust number of unaligned items left to process
- if(m->is_unaligned()) {
- m->set_unaligned(new_alignment);
- m->set_is_unaligned(m->unaligned() != 0);
- }
-
- // Now propagate the tags based on the new relative rate
- if(!propagate_tags(m->tag_propagation_policy(), d,
- d_start_nitems_read, m->relative_rate(),
- m->mp_relative_rate(), m->update_rate(),
- d_returned_tags, m->unique_id()))
- goto were_done;
-
- if(n == block::WORK_DONE)
- goto were_done;
-
- if(n != block::WORK_CALLED_PRODUCE)
- d->produce_each(n); // advance write pointers
-
- // For some blocks that can change their produce/consume ratio
- // (the relative_rate), we might want to automatically update
- // based on the amount of items written/read.
- // In the block constructor, use enable_update_rate(true).
- if(m->update_rate()) {
- //rrate = ((double)(m->nitems_written(0))) / ((double)m->nitems_read(0));
- //if(rrate > 0.0)
- // m->set_relative_rate(rrate);
- if((n > 0) && (d->consumed() > 0))
- m->set_relative_rate((uint64_t)n, (uint64_t)d->consumed());
- }
-
- if(d->d_produce_or > 0) // block produced something
- return READY;
-
- // We didn't produce any output even though we called general_work.
- // We have (most likely) consumed some input.
-
- /*
- // If this is a source, it's broken.
- if(d->source_p()) {
- std::cerr << "block_executor: source " << m
- << " produced no output. We're marking it DONE.\n";
- // FIXME maybe we ought to raise an exception...
- goto were_done;
- }
- */
-
- // Have the caller try again...
- return READY_NO_OUTPUT;
+ LOG(*d_log << " general_work: noutput_items = " << noutput_items
+ << " result = " << n << std::endl);
+
+ // Adjust number of unaligned items left to process
+ if (m->is_unaligned()) {
+ m->set_unaligned(new_alignment);
+ m->set_is_unaligned(m->unaligned() != 0);
+ }
+
+ // Now propagate the tags based on the new relative rate
+ if (!propagate_tags(m->tag_propagation_policy(),
+ d,
+ d_start_nitems_read,
+ m->relative_rate(),
+ m->mp_relative_rate(),
+ m->update_rate(),
+ d_returned_tags,
+ m->unique_id()))
+ goto were_done;
+
+ if (n == block::WORK_DONE)
+ goto were_done;
+
+ if (n != block::WORK_CALLED_PRODUCE)
+ d->produce_each(n); // advance write pointers
+
+ // For some blocks that can change their produce/consume ratio
+ // (the relative_rate), we might want to automatically update
+ // based on the amount of items written/read.
+ // In the block constructor, use enable_update_rate(true).
+ if (m->update_rate()) {
+ // rrate = ((double)(m->nitems_written(0))) / ((double)m->nitems_read(0));
+ // if(rrate > 0.0)
+ // m->set_relative_rate(rrate);
+ if ((n > 0) && (d->consumed() > 0))
+ m->set_relative_rate((uint64_t)n, (uint64_t)d->consumed());
+ }
+
+ if (d->d_produce_or > 0) // block produced something
+ return READY;
+
+ // We didn't produce any output even though we called general_work.
+ // We have (most likely) consumed some input.
+
+ /*
+ // If this is a source, it's broken.
+ if(d->source_p()) {
+ std::cerr << "block_executor: source " << m
+ << " produced no output. We're marking it DONE.\n";
+ // FIXME maybe we ought to raise an exception...
+ goto were_done;
+ }
+ */
+
+ // Have the caller try again...
+ return READY_NO_OUTPUT;
}
assert(0);
- were_done:
+were_done:
LOG(*d_log << " were_done\n");
- d->set_done (true);
+ d->set_done(true);
return DONE;
- }
+}
} /* namespace gr */