summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/block_executor.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-runtime/lib/block_executor.cc')
-rw-r--r--gnuradio-runtime/lib/block_executor.cc55
1 files changed, 39 insertions, 16 deletions
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc
index 5c23df39a5..30bdacecf5 100644
--- a/gnuradio-runtime/lib/block_executor.cc
+++ b/gnuradio-runtime/lib/block_executor.cc
@@ -73,14 +73,15 @@ namespace gr {
if(min_noutput_items == 0)
min_noutput_items = 1;
for(int i = 0; i < d->noutputs (); i++) {
- gr::thread::scoped_lock guard(*d->output(i)->mutex());
- int avail_n = round_down(d->output(i)->space_available(), output_multiple);
- int best_n = round_down(d->output(i)->bufsize()/2, output_multiple);
+ buffer_sptr out_buf = d->output(i);
+ gr::thread::scoped_lock guard(*out_buf->mutex());
+ int avail_n = round_down(out_buf->space_available(), output_multiple);
+ int best_n = round_down(out_buf->bufsize()/2, output_multiple);
if(best_n < min_noutput_items)
throw std::runtime_error("Buffer too small for min_noutput_items");
int n = std::min(avail_n, best_n);
if(n < min_noutput_items){ // We're blocked on output.
- if(d->output(i)->done()){ // Downstream is done, therefore we're done.
+ if(out_buf->done()){ // Downstream is done, therefore we're done.
return -1;
}
return 0;
@@ -106,16 +107,28 @@ namespace gr {
case block::TPP_CUSTOM:
return true;
case block::TPP_ALL_TO_ALL:
+ {
// every tag on every input propogates to everyone downstream
+ std::vector<buffer_sptr> out_buf;
+
for(int i = 0; i < d->ninputs(); i++) {
d->get_tags_in_range(rtags, i, start_nitems_read[i],
d->nitems_read(i), block_id);
+ if (rtags.size() == 0)
+ continue;
+
+ if (out_buf.size() == 0) {
+ out_buf.reserve(d->noutputs());
+ for(int o = 0; o < d->noutputs(); o++)
+ out_buf.push_back(d->output(o));
+ }
+
std::vector<tag_t>::iterator t;
if(rrate == 1.0) {
for(t = rtags.begin(); t != rtags.end(); t++) {
for(int o = 0; o < d->noutputs(); o++)
- d->output(o)->add_item_tag(*t);
+ out_buf[o]->add_item_tag(*t);
}
}
else {
@@ -123,25 +136,33 @@ namespace gr {
tag_t new_tag = *t;
new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
for(int o = 0; o < d->noutputs(); o++)
- d->output(o)->add_item_tag(new_tag);
+ out_buf[o]->add_item_tag(new_tag);
}
}
}
+ }
break;
case block::TPP_ONE_TO_ONE:
// tags from input i only go to output i
// this requires d->ninputs() == d->noutputs; this is checked when this
// type of tag-propagation system is selected in block_detail
if(d->ninputs() == d->noutputs()) {
+ buffer_sptr out_buf;
+
for(int i = 0; i < d->ninputs(); i++) {
d->get_tags_in_range(rtags, i, start_nitems_read[i],
d->nitems_read(i), block_id);
+ if (rtags.size() == 0)
+ continue;
+
+ out_buf = d->output(i);
+
std::vector<tag_t>::iterator t;
for(t = rtags.begin(); t != rtags.end(); t++) {
tag_t new_tag = *t;
new_tag.offset = ((double)new_tag.offset * rrate) + 0.5;
- d->output(i)->add_item_tag(new_tag);
+ out_buf->add_item_tag(new_tag);
}
}
}
@@ -149,7 +170,6 @@ namespace gr {
std::cerr << "Error: block_executor: propagation_policy 'ONE-TO-ONE' requires ninputs == noutputs" << std::endl;
return false;
}
-
break;
default:
return true;
@@ -244,9 +264,10 @@ namespace gr {
/*
* Acquire the mutex and grab local copies of items_available and done.
*/
- gr::thread::scoped_lock guard(*d->input(i)->mutex());
- d_ninput_items[i] = d->input(i)->items_available();
- d_input_done[i] = d->input(i)->done();
+ buffer_reader_sptr in_buf = d->input(i);
+ gr::thread::scoped_lock guard(*in_buf->mutex());
+ d_ninput_items[i] = in_buf->items_available();
+ d_input_done[i] = in_buf->done();
}
LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl);
@@ -288,9 +309,10 @@ namespace gr {
/*
* Acquire the mutex and grab local copies of items_available and done.
*/
- gr::thread::scoped_lock guard(*d->input(i)->mutex());
- d_ninput_items[i] = d->input(i)->items_available ();
- d_input_done[i] = d->input(i)->done();
+ buffer_reader_sptr in_buf = d->input(i);
+ gr::thread::scoped_lock guard(*in_buf->mutex());
+ d_ninput_items[i] = in_buf->items_available ();
+ d_input_done[i] = in_buf->done();
}
max_items_avail = std::max(max_items_avail, d_ninput_items[i]);
}
@@ -403,7 +425,8 @@ namespace gr {
goto were_done;
// Is it possible to ever fulfill this request?
- if(d_ninput_items_required[i] > d->input(i)->max_possible_items_available()) {
+ buffer_reader_sptr in_buf = d->input(i);
+ if(d_ninput_items_required[i] > in_buf->max_possible_items_available()) {
// Nope, never going to happen...
std::cerr << "\nsched: <block " << m->name()
<< " (" << m->unique_id() << ")>"
@@ -412,7 +435,7 @@ namespace gr {
<< " ninput_items_required = "
<< d_ninput_items_required[i] << "\n"
<< " max_possible_items_available = "
- << d->input(i)->max_possible_items_available() << "\n"
+ << in_buf->max_possible_items_available() << "\n"
<< " If this is a filter, consider reducing the number of taps.\n";
goto were_done;
}