diff options
Diffstat (limited to 'gnuradio-runtime/lib/block_executor.cc')
-rw-r--r-- | gnuradio-runtime/lib/block_executor.cc | 55 |
1 files changed, 39 insertions, 16 deletions
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc index 5c23df39a5..30bdacecf5 100644 --- a/gnuradio-runtime/lib/block_executor.cc +++ b/gnuradio-runtime/lib/block_executor.cc @@ -73,14 +73,15 @@ namespace gr { if(min_noutput_items == 0) min_noutput_items = 1; for(int i = 0; i < d->noutputs (); i++) { - gr::thread::scoped_lock guard(*d->output(i)->mutex()); - int avail_n = round_down(d->output(i)->space_available(), output_multiple); - int best_n = round_down(d->output(i)->bufsize()/2, output_multiple); + buffer_sptr out_buf = d->output(i); + gr::thread::scoped_lock guard(*out_buf->mutex()); + int avail_n = round_down(out_buf->space_available(), output_multiple); + int best_n = round_down(out_buf->bufsize()/2, output_multiple); if(best_n < min_noutput_items) throw std::runtime_error("Buffer too small for min_noutput_items"); int n = std::min(avail_n, best_n); if(n < min_noutput_items){ // We're blocked on output. - if(d->output(i)->done()){ // Downstream is done, therefore we're done. + if(out_buf->done()){ // Downstream is done, therefore we're done. return -1; } return 0; @@ -106,16 +107,28 @@ namespace gr { case block::TPP_CUSTOM: return true; case block::TPP_ALL_TO_ALL: + { // every tag on every input propogates to everyone downstream + std::vector<buffer_sptr> out_buf; + for(int i = 0; i < d->ninputs(); i++) { d->get_tags_in_range(rtags, i, start_nitems_read[i], d->nitems_read(i), block_id); + if (rtags.size() == 0) + continue; + + if (out_buf.size() == 0) { + out_buf.reserve(d->noutputs()); + for(int o = 0; o < d->noutputs(); o++) + out_buf.push_back(d->output(o)); + } + std::vector<tag_t>::iterator t; if(rrate == 1.0) { for(t = rtags.begin(); t != rtags.end(); t++) { for(int o = 0; o < d->noutputs(); o++) - d->output(o)->add_item_tag(*t); + out_buf[o]->add_item_tag(*t); } } else { @@ -123,25 +136,33 @@ namespace gr { tag_t new_tag = *t; new_tag.offset = ((double)new_tag.offset * rrate) + 0.5; for(int o = 0; o < d->noutputs(); o++) - d->output(o)->add_item_tag(new_tag); + out_buf[o]->add_item_tag(new_tag); } } } + } break; case block::TPP_ONE_TO_ONE: // tags from input i only go to output i // this requires d->ninputs() == d->noutputs; this is checked when this // type of tag-propagation system is selected in block_detail if(d->ninputs() == d->noutputs()) { + buffer_sptr out_buf; + for(int i = 0; i < d->ninputs(); i++) { d->get_tags_in_range(rtags, i, start_nitems_read[i], d->nitems_read(i), block_id); + if (rtags.size() == 0) + continue; + + out_buf = d->output(i); + std::vector<tag_t>::iterator t; for(t = rtags.begin(); t != rtags.end(); t++) { tag_t new_tag = *t; new_tag.offset = ((double)new_tag.offset * rrate) + 0.5; - d->output(i)->add_item_tag(new_tag); + out_buf->add_item_tag(new_tag); } } } @@ -149,7 +170,6 @@ namespace gr { std::cerr << "Error: block_executor: propagation_policy 'ONE-TO-ONE' requires ninputs == noutputs" << std::endl; return false; } - break; default: return true; @@ -244,9 +264,10 @@ namespace gr { /* * Acquire the mutex and grab local copies of items_available and done. */ - gr::thread::scoped_lock guard(*d->input(i)->mutex()); - d_ninput_items[i] = d->input(i)->items_available(); - d_input_done[i] = d->input(i)->done(); + buffer_reader_sptr in_buf = d->input(i); + gr::thread::scoped_lock guard(*in_buf->mutex()); + d_ninput_items[i] = in_buf->items_available(); + d_input_done[i] = in_buf->done(); } LOG(*d_log << " d_ninput_items[" << i << "] = " << d_ninput_items[i] << std::endl); @@ -288,9 +309,10 @@ namespace gr { /* * Acquire the mutex and grab local copies of items_available and done. */ - gr::thread::scoped_lock guard(*d->input(i)->mutex()); - d_ninput_items[i] = d->input(i)->items_available (); - d_input_done[i] = d->input(i)->done(); + buffer_reader_sptr in_buf = d->input(i); + gr::thread::scoped_lock guard(*in_buf->mutex()); + d_ninput_items[i] = in_buf->items_available (); + d_input_done[i] = in_buf->done(); } max_items_avail = std::max(max_items_avail, d_ninput_items[i]); } @@ -403,7 +425,8 @@ namespace gr { goto were_done; // Is it possible to ever fulfill this request? - if(d_ninput_items_required[i] > d->input(i)->max_possible_items_available()) { + buffer_reader_sptr in_buf = d->input(i); + if(d_ninput_items_required[i] > in_buf->max_possible_items_available()) { // Nope, never going to happen... std::cerr << "\nsched: <block " << m->name() << " (" << m->unique_id() << ")>" @@ -412,7 +435,7 @@ namespace gr { << " ninput_items_required = " << d_ninput_items_required[i] << "\n" << " max_possible_items_available = " - << d->input(i)->max_possible_items_available() << "\n" + << in_buf->max_possible_items_available() << "\n" << " If this is a filter, consider reducing the number of taps.\n"; goto were_done; } |