diff options
author | David Sorber <david.sorber@blacklynx.tech> | 2021-05-12 08:59:21 -0400 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-10-25 11:27:01 -0400 |
commit | 788827ae116bef871e144abd39b1e4482208eabe (patch) | |
tree | dcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/lib/block_executor.cc | |
parent | b8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff) |
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes:
* Refactored existing single mapped buffer code and created single
mapped buffer abstraction; wrapping within single mapped buffers
is handled explicitly by input blocked and output blocked
callbacks that are called from block_executor
* Added simple custom buffer allocation interface (NOTE: this
interface will change for milestone 2)
* Accelerated blocks are still responsible for data transfer but the
custom buffer interface eliminates the double copy problem
Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib/block_executor.cc')
-rw-r--r-- | gnuradio-runtime/lib/block_executor.cc | 206 |
1 files changed, 168 insertions, 38 deletions
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc index c026934bd9..6fbf2c5c17 100644 --- a/gnuradio-runtime/lib/block_executor.cc +++ b/gnuradio-runtime/lib/block_executor.cc @@ -14,13 +14,13 @@ #include <gnuradio/block.h> #include <gnuradio/block_detail.h> -#include <gnuradio/buffer.h> -#include <gnuradio/logger.h> +#include <gnuradio/custom_lock.h> #include <gnuradio/prefs.h> #include <block_executor.h> #include <limits> #include <sstream> + namespace gr { // must be defined to either 0 or 1 @@ -53,27 +53,55 @@ inline static unsigned int round_down(unsigned int n, unsigned int multiple) // buffers or -1 if we're output blocked and the output we're // blocked on is done. // -static int -min_available_space(block_detail* d, int output_multiple, int min_noutput_items) +static int min_available_space(block* m, + block_detail* d, + int output_multiple, + int min_noutput_items, + int& output_idx) { + gr::logger_ptr logger; + gr::logger_ptr debug_logger; + gr::configure_default_loggers(logger, debug_logger, "min_available_space"); + int min_space = std::numeric_limits<int>::max(); if (min_noutput_items == 0) min_noutput_items = 1; - for (int i = 0; i < d->noutputs(); i++) { + for (int i = output_idx; i < d->noutputs(); i++) { buffer_sptr out_buf = d->output(i); gr::thread::scoped_lock guard(*out_buf->mutex()); - int avail_n = round_down(out_buf->space_available(), output_multiple); + int space_avail = out_buf->space_available(); + int avail_n = round_down(space_avail, output_multiple); + // If not strictly output multiple size aligned, potentially use all + // space available in the output buffer + if (avail_n == 0 && space_avail < output_multiple && !m->output_multiple_set()) { + avail_n = std::max(avail_n, space_avail); + } int best_n = round_down(out_buf->bufsize() / 2, output_multiple); if (best_n < min_noutput_items) throw std::runtime_error("Buffer too small for min_noutput_items"); int n = std::min(avail_n, best_n); if (n < min_noutput_items) { // We're blocked on output. - if (out_buf->done()) { // Downstream is done, therefore we're done. + LOG(std::ostringstream msg; + msg << m << " **[i=" << i << "] output_multiple=" << output_multiple + << " min_noutput_items=" << min_noutput_items + << " avail_n=" << avail_n << " best_n=" << best_n << " n=" << n + << " min_space=" << min_space << " outbuf_done=" << out_buf->done(); + GR_LOG_INFO(debug_logger, msg.str());); + + if (out_buf->done()) { // Downstream is done, therefore we're done. return -1; } + + output_idx = i; return 0; } min_space = std::min(min_space, n); + + LOG(std::ostringstream msg; + msg << m << " [i=" << i << "] output_multiple=" << output_multiple + << " min_noutput_items=" << min_noutput_items << " avail_n=" << avail_n + << " best_n=" << best_n << " n=" << n << " min_space=" << min_space; + GR_LOG_INFO(debug_logger, msg.str());); } return min_space; } @@ -233,6 +261,7 @@ block_executor::state block_executor::run_one_iteration() int max_noutput_items; int new_alignment = 0; int alignment_state = -1; + int output_idx = 0; block* m = d_block.get(); block_detail* d = m->detail().get(); @@ -255,8 +284,10 @@ block_executor::state block_executor::run_one_iteration() d_start_nitems_read.resize(0); // determine the minimum available output space - noutput_items = - min_available_space(d, m->output_multiple(), m->min_noutput_items()); + output_idx = 0; + out_try_again: + noutput_items = min_available_space( + m, d, m->output_multiple(), m->min_noutput_items(), output_idx); noutput_items = std::min(noutput_items, max_noutput_items); LOG(std::ostringstream msg; msg << "source: noutput_items = " << noutput_items; GR_LOG_INFO(d_debug_logger, msg.str());); @@ -264,8 +295,28 @@ block_executor::state block_executor::run_one_iteration() goto were_done; if (noutput_items == 0) { // we're output blocked - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); - return BLKD_OUT; + LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT"; + GR_LOG_INFO(d_debug_logger, msg.str());); + + buffer_sptr out_buf = d->output(output_idx); + + if (out_buf->output_blkd_cb_ready(m->output_multiple())) { + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + if (!out_buf->output_blocked_callback(m->output_multiple())) { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([1] callback FAILED)"; + GR_LOG_INFO(d_debug_logger, msg.str());); + return BLKD_OUT; + } else { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx + << ")"; + GR_LOG_INFO(d_debug_logger, msg.str());); + goto out_try_again; + } + } else { + return BLKD_OUT; + } } goto setup_call_to_work; // jump to common code @@ -278,14 +329,14 @@ block_executor::state block_executor::run_one_iteration() d_input_done.resize(d->ninputs()); d_output_items.resize(0); d_start_nitems_read.resize(d->ninputs()); - LOG(GR_LOG_INFO(d_debug_logger, "sink");); + // LOG(GR_LOG_INFO(d_debug_logger, "sink");); + LOG(std::ostringstream msg; msg << m << " -- sink"; + GR_LOG_INFO(d_debug_logger, msg.str());); max_items_avail = 0; for (int i = 0; i < d->ninputs(); i++) { { - /* - * Acquire the mutex and grab local copies of items_available and done. - */ + // Acquire the mutex and grab local copies of items_available and done. buffer_reader_sptr in_buf = d->input(i); gr::thread::scoped_lock guard(*in_buf->mutex()); d_ninput_items[i] = in_buf->items_available(); @@ -293,10 +344,10 @@ block_executor::state block_executor::run_one_iteration() } LOG(std::ostringstream msg; - msg << "d_ninput_items[" << i << "] = " << d_ninput_items[i]; + msg << m << " -- d_ninput_items[" << i << "] = " << d_ninput_items[i]; GR_LOG_INFO(d_debug_logger, msg.str());); LOG(std::ostringstream msg; - msg << "d_input_done[" << i << "] = " << d_input_done[i]; + msg << m << " -- d_input_done[" << i << "] = " << d_input_done[i]; GR_LOG_INFO(d_debug_logger, msg.str());); if (d_ninput_items[i] < m->output_multiple() && d_input_done[i]) @@ -309,13 +360,16 @@ block_executor::state block_executor::run_one_iteration() noutput_items = (int)(max_items_avail * m->relative_rate()); noutput_items = round_down(noutput_items, m->output_multiple()); noutput_items = std::min(noutput_items, max_noutput_items); - LOG(std::ostringstream msg; msg << "max_items_avail = " << max_items_avail; + LOG(std::ostringstream msg; + msg << m << " -- max_items_avail = " << max_items_avail; GR_LOG_INFO(d_debug_logger, msg.str());); - LOG(std::ostringstream msg; msg << "noutput_items = " << noutput_items; + LOG(std::ostringstream msg; msg << m << " -- noutput_items = " << noutput_items; GR_LOG_INFO(d_debug_logger, msg.str());); if (noutput_items == 0) { // we're blocked on input - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; + GR_LOG_INFO(d_debug_logger, msg.str())); return BLKD_IN; } @@ -331,12 +385,11 @@ block_executor::state block_executor::run_one_iteration() d_output_items.resize(d->noutputs()); d_start_nitems_read.resize(d->ninputs()); + blkd_in_try_again: max_items_avail = 0; for (int i = 0; i < d->ninputs(); i++) { { - /* - * Acquire the mutex and grab local copies of items_available and done. - */ + // Acquire the mutex and grab local copies of items_available and done. buffer_reader_sptr in_buf = d->input(i); gr::thread::scoped_lock guard(*in_buf->mutex()); d_ninput_items[i] = in_buf->items_available(); @@ -346,11 +399,13 @@ block_executor::state block_executor::run_one_iteration() } // determine the minimum available output space - noutput_items = - min_available_space(d, m->output_multiple(), m->min_noutput_items()); + output_idx = 0; + out_try_again2: + noutput_items = min_available_space( + m, d, m->output_multiple(), m->min_noutput_items(), output_idx); if (ENABLE_LOGGING) { std::ostringstream msg; - msg << "regular "; + msg << m << " -- regular "; msg << m->relative_rate_i() << ":" << m->relative_rate_d(); msg << " max_items_avail = " << max_items_avail; msg << " noutput_items = " << noutput_items; @@ -360,13 +415,36 @@ block_executor::state block_executor::run_one_iteration() goto were_done; if (noutput_items == 0) { // we're output blocked - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); - return BLKD_OUT; + // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT"; + GR_LOG_INFO(d_debug_logger, msg.str())); + + buffer_sptr out_buf = d->output(output_idx); + + if (out_buf->output_blkd_cb_ready(m->output_multiple())) { + // Call the output blocked callback which will tell us if it was + // able to unblock the output + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + if (!out_buf->output_blocked_callback(m->output_multiple())) { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([2] callback FAILED)"; + GR_LOG_INFO(d_debug_logger, msg.str());); + return BLKD_OUT; + } else { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx + << ")"; + GR_LOG_INFO(d_debug_logger, msg.str());); + goto out_try_again2; + } + } else { + return BLKD_OUT; + } } try_again: if (m->fixed_rate()) { - // try to work it forward starting with max_items_avail. + // Try to work it forward starting with max_items_avail. // We want to try to consume all the input we've got. int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail); @@ -416,6 +494,10 @@ block_executor::state block_executor::run_one_iteration() // ask the block how much input they need to produce noutput_items m->forecast(noutput_items, d_ninput_items_required); + LOG(std::ostringstream msg; + msg << m << " -- FCAST noutput_items=" << noutput_items << " inputs_required=" + << d_ninput_items_required[0] << " inputs_avail=" << d_ninput_items[0]; + GR_LOG_INFO(d_debug_logger, msg.str())); // See if we've got sufficient input available and make sure we // didn't overflow on the input. @@ -445,12 +527,29 @@ block_executor::state block_executor::run_one_iteration() } // We're blocked on input - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; + GR_LOG_INFO(d_debug_logger, msg.str())); + + buffer_reader_sptr in_buf = d->input(i); + + LOG(std::ostringstream msg; + msg << m << " (t: " << this << ") -- pre-callback"; + GR_LOG_DEBUG(d_debug_logger, msg.str())); + + if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) { + gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer()); + if (in_buf->input_blocked_callback(d_ninput_items_required[i], + d_ninput_items[i])) { + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN -- TRY AGAIN"; + GR_LOG_INFO(d_debug_logger, msg.str())); + goto blkd_in_try_again; + } + } + if (d_input_done[i]) // If the upstream block is done, we're done goto were_done; // Is it possible to ever fulfill this request? - buffer_reader_sptr in_buf = d->input(i); if (d_ninput_items_required[i] > in_buf->max_possible_items_available()) { // Nope, never going to happen... std::ostringstream msg; @@ -476,14 +575,18 @@ block_executor::state block_executor::run_one_iteration() // We've got enough data on each input to produce noutput_items. // Finish setting up the call to work. - for (int i = 0; i < d->ninputs(); i++) + for (int i = 0; i < d->ninputs(); i++) { + d->input(i)->buffer()->increment_active(); d_input_items[i] = d->input(i)->read_pointer(); + } setup_call_to_work: d->d_produce_or = 0; - for (int i = 0; i < d->noutputs(); i++) + for (int i = 0; i < d->noutputs(); i++) { + d->output(i)->increment_active(); d_output_items[i] = d->output(i)->write_pointer(); + } // determine where to start looking for new tags for (int i = 0; i < d->ninputs(); i++) @@ -504,7 +607,10 @@ block_executor::state block_executor::run_one_iteration() #endif /* GR_PERFORMANCE_COUNTERS */ LOG(std::ostringstream msg; - msg << "general_work: noutput_items = " << noutput_items << " result = " << n; + msg << m << " -- general_work: noutput_items = " << noutput_items + << " ninput_items=" << (d->ninputs() >= 1 ? d_ninput_items[0] : 0) + << " ninput_req=" << (d->ninputs() >= 1 ? d_ninput_items_required[0] : 0) + << " result = " << n; GR_LOG_INFO(d_debug_logger, msg.str());); // Adjust number of unaligned items left to process @@ -521,14 +627,21 @@ block_executor::state block_executor::run_one_iteration() m->mp_relative_rate(), m->update_rate(), d_returned_tags, - m->unique_id())) + m->unique_id())) { + d->post_work_cleanup(); goto were_done; + } - if (n == block::WORK_DONE) + if (n == block::WORK_DONE) { + d->post_work_cleanup(); goto were_done; + } - if (n != block::WORK_CALLED_PRODUCE) + if (n != block::WORK_CALLED_PRODUCE) { d->produce_each(n); // advance write pointers + } + + d->post_work_cleanup(); // For some blocks that can change their produce/consume ratio // (the relative_rate), we might want to automatically update @@ -558,13 +671,30 @@ block_executor::state block_executor::run_one_iteration() } */ + // The call to general_work() produced no output therefore the block may + // be "effectively output blocked". Call the output blocked callback + // just in case, it can do no harm. + for (int i = 0; i < d->noutputs(); i++) { + buffer_sptr out_buf = d->output(i); + LOG(std::ostringstream msg; + msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED"; + GR_LOG_DEBUG(d_debug_logger, msg.str());); + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + out_buf->output_blocked_callback(m->output_multiple(), true); + LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i + << "] -- OUTPUT BLOCKED CBACK: " << rc; + GR_LOG_DEBUG(d_debug_logger, msg.str());); + } + // Have the caller try again... return READY_NO_OUTPUT; } GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine"); were_done: - LOG(GR_LOG_INFO(d_debug_logger, "we're done");); + // LOG(GR_LOG_INFO(d_debug_logger, "we're done");); + LOG(std::ostringstream msg; msg << m << " -- we're done"; + GR_LOG_INFO(d_debug_logger, msg.str())); d->set_done(true); return DONE; } |