summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/block_executor.cc
diff options
context:
space:
mode:
authorDavid Sorber <david.sorber@blacklynx.tech>2021-05-12 08:59:21 -0400
committermormj <34754695+mormj@users.noreply.github.com>2021-10-25 11:27:01 -0400
commit788827ae116bef871e144abd39b1e4482208eabe (patch)
treedcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/lib/block_executor.cc
parentb8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff)
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes: * Refactored existing single mapped buffer code and created single mapped buffer abstraction; wrapping within single mapped buffers is handled explicitly by input blocked and output blocked callbacks that are called from block_executor * Added simple custom buffer allocation interface (NOTE: this interface will change for milestone 2) * Accelerated blocks are still responsible for data transfer but the custom buffer interface eliminates the double copy problem Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib/block_executor.cc')
-rw-r--r--gnuradio-runtime/lib/block_executor.cc206
1 files changed, 168 insertions, 38 deletions
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc
index c026934bd9..6fbf2c5c17 100644
--- a/gnuradio-runtime/lib/block_executor.cc
+++ b/gnuradio-runtime/lib/block_executor.cc
@@ -14,13 +14,13 @@
#include <gnuradio/block.h>
#include <gnuradio/block_detail.h>
-#include <gnuradio/buffer.h>
-#include <gnuradio/logger.h>
+#include <gnuradio/custom_lock.h>
#include <gnuradio/prefs.h>
#include <block_executor.h>
#include <limits>
#include <sstream>
+
namespace gr {
// must be defined to either 0 or 1
@@ -53,27 +53,55 @@ inline static unsigned int round_down(unsigned int n, unsigned int multiple)
// buffers or -1 if we're output blocked and the output we're
// blocked on is done.
//
-static int
-min_available_space(block_detail* d, int output_multiple, int min_noutput_items)
+static int min_available_space(block* m,
+ block_detail* d,
+ int output_multiple,
+ int min_noutput_items,
+ int& output_idx)
{
+ gr::logger_ptr logger;
+ gr::logger_ptr debug_logger;
+ gr::configure_default_loggers(logger, debug_logger, "min_available_space");
+
int min_space = std::numeric_limits<int>::max();
if (min_noutput_items == 0)
min_noutput_items = 1;
- for (int i = 0; i < d->noutputs(); i++) {
+ for (int i = output_idx; i < d->noutputs(); i++) {
buffer_sptr out_buf = d->output(i);
gr::thread::scoped_lock guard(*out_buf->mutex());
- int avail_n = round_down(out_buf->space_available(), output_multiple);
+ int space_avail = out_buf->space_available();
+ int avail_n = round_down(space_avail, output_multiple);
+ // If not strictly output multiple size aligned, potentially use all
+ // space available in the output buffer
+ if (avail_n == 0 && space_avail < output_multiple && !m->output_multiple_set()) {
+ avail_n = std::max(avail_n, space_avail);
+ }
int best_n = round_down(out_buf->bufsize() / 2, output_multiple);
if (best_n < min_noutput_items)
throw std::runtime_error("Buffer too small for min_noutput_items");
int n = std::min(avail_n, best_n);
if (n < min_noutput_items) { // We're blocked on output.
- if (out_buf->done()) { // Downstream is done, therefore we're done.
+ LOG(std::ostringstream msg;
+ msg << m << " **[i=" << i << "] output_multiple=" << output_multiple
+ << " min_noutput_items=" << min_noutput_items
+ << " avail_n=" << avail_n << " best_n=" << best_n << " n=" << n
+ << " min_space=" << min_space << " outbuf_done=" << out_buf->done();
+ GR_LOG_INFO(debug_logger, msg.str()););
+
+ if (out_buf->done()) { // Downstream is done, therefore we're done.
return -1;
}
+
+ output_idx = i;
return 0;
}
min_space = std::min(min_space, n);
+
+ LOG(std::ostringstream msg;
+ msg << m << " [i=" << i << "] output_multiple=" << output_multiple
+ << " min_noutput_items=" << min_noutput_items << " avail_n=" << avail_n
+ << " best_n=" << best_n << " n=" << n << " min_space=" << min_space;
+ GR_LOG_INFO(debug_logger, msg.str()););
}
return min_space;
}
@@ -233,6 +261,7 @@ block_executor::state block_executor::run_one_iteration()
int max_noutput_items;
int new_alignment = 0;
int alignment_state = -1;
+ int output_idx = 0;
block* m = d_block.get();
block_detail* d = m->detail().get();
@@ -255,8 +284,10 @@ block_executor::state block_executor::run_one_iteration()
d_start_nitems_read.resize(0);
// determine the minimum available output space
- noutput_items =
- min_available_space(d, m->output_multiple(), m->min_noutput_items());
+ output_idx = 0;
+ out_try_again:
+ noutput_items = min_available_space(
+ m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
noutput_items = std::min(noutput_items, max_noutput_items);
LOG(std::ostringstream msg; msg << "source: noutput_items = " << noutput_items;
GR_LOG_INFO(d_debug_logger, msg.str()););
@@ -264,8 +295,28 @@ block_executor::state block_executor::run_one_iteration()
goto were_done;
if (noutput_items == 0) { // we're output blocked
- LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
- return BLKD_OUT;
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+
+ buffer_sptr out_buf = d->output(output_idx);
+
+ if (out_buf->output_blkd_cb_ready(m->output_multiple())) {
+ gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
+ if (!out_buf->output_blocked_callback(m->output_multiple())) {
+ LOG(std::ostringstream msg;
+ msg << m << " -- BLKD_OUT -- ([1] callback FAILED)";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+ return BLKD_OUT;
+ } else {
+ LOG(std::ostringstream msg;
+ msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx
+ << ")";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+ goto out_try_again;
+ }
+ } else {
+ return BLKD_OUT;
+ }
}
goto setup_call_to_work; // jump to common code
@@ -278,14 +329,14 @@ block_executor::state block_executor::run_one_iteration()
d_input_done.resize(d->ninputs());
d_output_items.resize(0);
d_start_nitems_read.resize(d->ninputs());
- LOG(GR_LOG_INFO(d_debug_logger, "sink"););
+ // LOG(GR_LOG_INFO(d_debug_logger, "sink"););
+ LOG(std::ostringstream msg; msg << m << " -- sink";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
max_items_avail = 0;
for (int i = 0; i < d->ninputs(); i++) {
{
- /*
- * Acquire the mutex and grab local copies of items_available and done.
- */
+ // Acquire the mutex and grab local copies of items_available and done.
buffer_reader_sptr in_buf = d->input(i);
gr::thread::scoped_lock guard(*in_buf->mutex());
d_ninput_items[i] = in_buf->items_available();
@@ -293,10 +344,10 @@ block_executor::state block_executor::run_one_iteration()
}
LOG(std::ostringstream msg;
- msg << "d_ninput_items[" << i << "] = " << d_ninput_items[i];
+ msg << m << " -- d_ninput_items[" << i << "] = " << d_ninput_items[i];
GR_LOG_INFO(d_debug_logger, msg.str()););
LOG(std::ostringstream msg;
- msg << "d_input_done[" << i << "] = " << d_input_done[i];
+ msg << m << " -- d_input_done[" << i << "] = " << d_input_done[i];
GR_LOG_INFO(d_debug_logger, msg.str()););
if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
@@ -309,13 +360,16 @@ block_executor::state block_executor::run_one_iteration()
noutput_items = (int)(max_items_avail * m->relative_rate());
noutput_items = round_down(noutput_items, m->output_multiple());
noutput_items = std::min(noutput_items, max_noutput_items);
- LOG(std::ostringstream msg; msg << "max_items_avail = " << max_items_avail;
+ LOG(std::ostringstream msg;
+ msg << m << " -- max_items_avail = " << max_items_avail;
GR_LOG_INFO(d_debug_logger, msg.str()););
- LOG(std::ostringstream msg; msg << "noutput_items = " << noutput_items;
+ LOG(std::ostringstream msg; msg << m << " -- noutput_items = " << noutput_items;
GR_LOG_INFO(d_debug_logger, msg.str()););
if (noutput_items == 0) { // we're blocked on input
- LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
+ // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_IN";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
return BLKD_IN;
}
@@ -331,12 +385,11 @@ block_executor::state block_executor::run_one_iteration()
d_output_items.resize(d->noutputs());
d_start_nitems_read.resize(d->ninputs());
+ blkd_in_try_again:
max_items_avail = 0;
for (int i = 0; i < d->ninputs(); i++) {
{
- /*
- * Acquire the mutex and grab local copies of items_available and done.
- */
+ // Acquire the mutex and grab local copies of items_available and done.
buffer_reader_sptr in_buf = d->input(i);
gr::thread::scoped_lock guard(*in_buf->mutex());
d_ninput_items[i] = in_buf->items_available();
@@ -346,11 +399,13 @@ block_executor::state block_executor::run_one_iteration()
}
// determine the minimum available output space
- noutput_items =
- min_available_space(d, m->output_multiple(), m->min_noutput_items());
+ output_idx = 0;
+ out_try_again2:
+ noutput_items = min_available_space(
+ m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
if (ENABLE_LOGGING) {
std::ostringstream msg;
- msg << "regular ";
+ msg << m << " -- regular ";
msg << m->relative_rate_i() << ":" << m->relative_rate_d();
msg << " max_items_avail = " << max_items_avail;
msg << " noutput_items = " << noutput_items;
@@ -360,13 +415,36 @@ block_executor::state block_executor::run_one_iteration()
goto were_done;
if (noutput_items == 0) { // we're output blocked
- LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
- return BLKD_OUT;
+ // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
+
+ buffer_sptr out_buf = d->output(output_idx);
+
+ if (out_buf->output_blkd_cb_ready(m->output_multiple())) {
+ // Call the output blocked callback which will tell us if it was
+ // able to unblock the output
+ gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
+ if (!out_buf->output_blocked_callback(m->output_multiple())) {
+ LOG(std::ostringstream msg;
+ msg << m << " -- BLKD_OUT -- ([2] callback FAILED)";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+ return BLKD_OUT;
+ } else {
+ LOG(std::ostringstream msg;
+ msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx
+ << ")";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+ goto out_try_again2;
+ }
+ } else {
+ return BLKD_OUT;
+ }
}
try_again:
if (m->fixed_rate()) {
- // try to work it forward starting with max_items_avail.
+ // Try to work it forward starting with max_items_avail.
// We want to try to consume all the input we've got.
int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
@@ -416,6 +494,10 @@ block_executor::state block_executor::run_one_iteration()
// ask the block how much input they need to produce noutput_items
m->forecast(noutput_items, d_ninput_items_required);
+ LOG(std::ostringstream msg;
+ msg << m << " -- FCAST noutput_items=" << noutput_items << " inputs_required="
+ << d_ninput_items_required[0] << " inputs_avail=" << d_ninput_items[0];
+ GR_LOG_INFO(d_debug_logger, msg.str()));
// See if we've got sufficient input available and make sure we
// didn't overflow on the input.
@@ -445,12 +527,29 @@ block_executor::state block_executor::run_one_iteration()
}
// We're blocked on input
- LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_IN";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
+
+ buffer_reader_sptr in_buf = d->input(i);
+
+ LOG(std::ostringstream msg;
+ msg << m << " (t: " << this << ") -- pre-callback";
+ GR_LOG_DEBUG(d_debug_logger, msg.str()));
+
+ if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) {
+ gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer());
+ if (in_buf->input_blocked_callback(d_ninput_items_required[i],
+ d_ninput_items[i])) {
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_IN -- TRY AGAIN";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
+ goto blkd_in_try_again;
+ }
+ }
+
if (d_input_done[i]) // If the upstream block is done, we're done
goto were_done;
// Is it possible to ever fulfill this request?
- buffer_reader_sptr in_buf = d->input(i);
if (d_ninput_items_required[i] > in_buf->max_possible_items_available()) {
// Nope, never going to happen...
std::ostringstream msg;
@@ -476,14 +575,18 @@ block_executor::state block_executor::run_one_iteration()
// We've got enough data on each input to produce noutput_items.
// Finish setting up the call to work.
- for (int i = 0; i < d->ninputs(); i++)
+ for (int i = 0; i < d->ninputs(); i++) {
+ d->input(i)->buffer()->increment_active();
d_input_items[i] = d->input(i)->read_pointer();
+ }
setup_call_to_work:
d->d_produce_or = 0;
- for (int i = 0; i < d->noutputs(); i++)
+ for (int i = 0; i < d->noutputs(); i++) {
+ d->output(i)->increment_active();
d_output_items[i] = d->output(i)->write_pointer();
+ }
// determine where to start looking for new tags
for (int i = 0; i < d->ninputs(); i++)
@@ -504,7 +607,10 @@ block_executor::state block_executor::run_one_iteration()
#endif /* GR_PERFORMANCE_COUNTERS */
LOG(std::ostringstream msg;
- msg << "general_work: noutput_items = " << noutput_items << " result = " << n;
+ msg << m << " -- general_work: noutput_items = " << noutput_items
+ << " ninput_items=" << (d->ninputs() >= 1 ? d_ninput_items[0] : 0)
+ << " ninput_req=" << (d->ninputs() >= 1 ? d_ninput_items_required[0] : 0)
+ << " result = " << n;
GR_LOG_INFO(d_debug_logger, msg.str()););
// Adjust number of unaligned items left to process
@@ -521,14 +627,21 @@ block_executor::state block_executor::run_one_iteration()
m->mp_relative_rate(),
m->update_rate(),
d_returned_tags,
- m->unique_id()))
+ m->unique_id())) {
+ d->post_work_cleanup();
goto were_done;
+ }
- if (n == block::WORK_DONE)
+ if (n == block::WORK_DONE) {
+ d->post_work_cleanup();
goto were_done;
+ }
- if (n != block::WORK_CALLED_PRODUCE)
+ if (n != block::WORK_CALLED_PRODUCE) {
d->produce_each(n); // advance write pointers
+ }
+
+ d->post_work_cleanup();
// For some blocks that can change their produce/consume ratio
// (the relative_rate), we might want to automatically update
@@ -558,13 +671,30 @@ block_executor::state block_executor::run_one_iteration()
}
*/
+ // The call to general_work() produced no output therefore the block may
+ // be "effectively output blocked". Call the output blocked callback
+ // just in case, it can do no harm.
+ for (int i = 0; i < d->noutputs(); i++) {
+ buffer_sptr out_buf = d->output(i);
+ LOG(std::ostringstream msg;
+ msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED";
+ GR_LOG_DEBUG(d_debug_logger, msg.str()););
+ gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
+ out_buf->output_blocked_callback(m->output_multiple(), true);
+ LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i
+ << "] -- OUTPUT BLOCKED CBACK: " << rc;
+ GR_LOG_DEBUG(d_debug_logger, msg.str()););
+ }
+
// Have the caller try again...
return READY_NO_OUTPUT;
}
GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine");
were_done:
- LOG(GR_LOG_INFO(d_debug_logger, "we're done"););
+ // LOG(GR_LOG_INFO(d_debug_logger, "we're done"););
+ LOG(std::ostringstream msg; msg << m << " -- we're done";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
d->set_done(true);
return DONE;
}