diff options
author | David Sorber <david.sorber@blacklynx.tech> | 2021-05-12 08:59:21 -0400 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-10-25 11:27:01 -0400 |
commit | 788827ae116bef871e144abd39b1e4482208eabe (patch) | |
tree | dcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/lib | |
parent | b8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff) |
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes:
* Refactored existing single mapped buffer code and created single
mapped buffer abstraction; wrapping within single mapped buffers
is handled explicitly by input blocked and output blocked
callbacks that are called from block_executor
* Added simple custom buffer allocation interface (NOTE: this
interface will change for milestone 2)
* Accelerated blocks are still responsible for data transfer but the
custom buffer interface eliminates the double copy problem
Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib')
-rw-r--r-- | gnuradio-runtime/lib/CMakeLists.txt | 5 | ||||
-rw-r--r-- | gnuradio-runtime/lib/block.cc | 141 | ||||
-rw-r--r-- | gnuradio-runtime/lib/block_detail.cc | 1 | ||||
-rw-r--r-- | gnuradio-runtime/lib/block_executor.cc | 206 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer.cc | 273 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_double_mapped.cc | 155 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_reader.cc | 180 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_reader_sm.cc | 149 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_single_mapped.cc | 297 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_type.cc | 18 | ||||
-rw-r--r-- | gnuradio-runtime/lib/flat_flowgraph.cc | 206 | ||||
-rw-r--r-- | gnuradio-runtime/lib/flat_flowgraph.h | 3 | ||||
-rw-r--r-- | gnuradio-runtime/lib/qa_buffer.cc | 14 | ||||
-rw-r--r-- | gnuradio-runtime/lib/tpb_detail.cc | 1 |
14 files changed, 1336 insertions, 313 deletions
diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt index 42486de801..ca96549386 100644 --- a/gnuradio-runtime/lib/CMakeLists.txt +++ b/gnuradio-runtime/lib/CMakeLists.txt @@ -55,6 +55,11 @@ add_library(gnuradio-runtime block_gateway_impl.cc block_registry.cc buffer.cc + buffer_double_mapped.cc + buffer_reader.cc + buffer_reader_sm.cc + buffer_single_mapped.cc + buffer_type.cc flat_flowgraph.cc flowgraph.cc hier_block2.cc diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc index cddf5a5baa..bb6ce95298 100644 --- a/gnuradio-runtime/lib/block.cc +++ b/gnuradio-runtime/lib/block.cc @@ -24,6 +24,12 @@ namespace gr { +// Moved from flat_flowgraph.cc +// 32Kbyte buffer size between blocks +#define GR_FIXED_BUFFER_SIZE (32 * (1L << 10)) + +static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE; + block::block(const std::string& name, io_signature::sptr input_signature, io_signature::sptr output_signature) @@ -375,11 +381,146 @@ void block::set_min_output_buffer(int port, long min_output_buffer) d_min_output_buffer[port] = min_output_buffer; } +void block::allocate_detail(int ninputs, + int noutputs, + const std::vector<int>& downstream_max_nitems_vec, + const std::vector<uint64_t>& downstream_lcm_nitems_vec) +{ + block_detail_sptr detail = make_block_detail(ninputs, noutputs); + + GR_LOG_DEBUG(d_debug_logger, "Creating block detail for " + identifier()); + + for (int i = 0; i < noutputs; i++) { + expand_minmax_buffer(i); + + buffer_sptr buffer = allocate_buffer( + i, downstream_max_nitems_vec[i], downstream_lcm_nitems_vec[i]); + GR_LOG_DEBUG(d_debug_logger, + "Allocated buffer for output " + identifier() + " " + + std::to_string(i)); + detail->set_output(i, buffer); + + // Update the block's max_output_buffer based on what was actually allocated. + if ((max_output_buffer(i) != buffer->bufsize()) && (max_output_buffer(i) != -1)) + GR_LOG_WARN(d_logger, + boost::format("Block (%1%) max output buffer set to %2%" + " instead of requested %3%") % + alias() % buffer->bufsize() % max_output_buffer(i)); + set_max_output_buffer(i, buffer->bufsize()); + } + + // Store the block_detail that was created above + set_detail(detail); +} + +buffer_sptr block::replace_buffer(uint32_t out_port, block_sptr block_owner) +{ + block_detail_sptr detail_ = detail(); + buffer_sptr orig_buffer = detail_->output(out_port); + + // Make a new buffer but this time use the passed in block as the owner + buffer_sptr new_buffer = make_buffer(orig_buffer->bufsize(), + orig_buffer->get_sizeof_item(), + orig_buffer->get_downstream_lcm_nitems(), + shared_from_base<block>(), + block_owner); + + detail_->set_output(out_port, new_buffer); + return new_buffer; +} bool block::update_rate() const { return d_update_rate; } void block::enable_update_rate(bool en) { d_update_rate = en; } +buffer_sptr block::allocate_buffer(int port, + int downstream_max_nitems, + uint64_t downstream_lcm_nitems) +{ + int item_size = output_signature()->sizeof_stream_item(port); + + // *2 because we're now only filling them 1/2 way in order to + // increase the available parallelism when using the TPB scheduler. + // (We're double buffering, where we used to single buffer) + int nitems = s_fixed_buffer_size * 2 / item_size; + + // Make sure there are at least twice the output_multiple no. of items + if (nitems < 2 * output_multiple()) // Note: this means output_multiple() + nitems = 2 * output_multiple(); // can't be changed by block dynamically + + // Limit buffer size if indicated + if (max_output_buffer(port) > 0) { + // std::cout << "constraining output items to " << block->max_output_buffer(port) + // << "\n"; + nitems = std::min((long)nitems, (long)max_output_buffer(port)); + nitems -= nitems % output_multiple(); + if (nitems < 1) + throw std::runtime_error("problems allocating a buffer with the given max " + "output buffer constraint!"); + } else if (min_output_buffer(port) > 0) { + nitems = std::max((long)nitems, (long)min_output_buffer(port)); + nitems -= nitems % output_multiple(); + if (nitems < 1) + throw std::runtime_error("problems allocating a buffer with the given min " + "output buffer constraint!"); + } + + // If any downstream blocks are decimators and/or have a large output_multiple, + // ensure we have a buffer at least twice their decimation factor*output_multiple + nitems = std::max(nitems, downstream_max_nitems); + + // We're going to let this fail once and retry. If that fails, throw and exit. + buffer_sptr buf; + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + GR_LOG_DEBUG(d_logger, + "Block: " + name() + " allocated buffer for output " + identifier()); +#endif + + try { +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "downstream_max_nitems: " << downstream_max_nitems + << " -- downstream_lcm_nitems: " << downstream_lcm_nitems + << " -- output_multiple(): " << output_multiple() + << " -- out_mult_set: " << output_multiple_set() << " -- nitems: " << nitems + << " -- history: " << history() << " -- relative_rate: " << relative_rate(); + if (relative_rate() != 1.0) { + msg << " (" << relative_rate_i() << " / " << relative_rate_d() << ")"; + } + msg << " -- fixed_rate: " << fixed_rate(); + if (fixed_rate()) { + int num_inputs = fixed_rate_noutput_to_ninput(1) - (history() - 1); + msg << " (" << num_inputs << " -> " + << fixed_rate_ninput_to_noutput(num_inputs + (history() - 1)) << ")"; + } + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + buf = make_buffer(nitems, + item_size, + downstream_lcm_nitems, + shared_from_base<block>(), + shared_from_base<block>()); + + } catch (std::bad_alloc&) { + buf = make_buffer(nitems, + item_size, + downstream_lcm_nitems, + shared_from_base<block>(), + shared_from_base<block>()); + } + + // Set the max noutput items size here to make sure it's always + // set in the block and available in the start() method. + // But don't overwrite if the user has set this externally. + if (!is_set_max_noutput_items()) + set_max_noutput_items(nitems); + + return buf; +} + float block::pc_noutput_items() { if (d_detail) { diff --git a/gnuradio-runtime/lib/block_detail.cc b/gnuradio-runtime/lib/block_detail.cc index 239223603d..f5283c56b0 100644 --- a/gnuradio-runtime/lib/block_detail.cc +++ b/gnuradio-runtime/lib/block_detail.cc @@ -14,6 +14,7 @@ #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/logger.h> namespace gr { diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc index c026934bd9..6fbf2c5c17 100644 --- a/gnuradio-runtime/lib/block_executor.cc +++ b/gnuradio-runtime/lib/block_executor.cc @@ -14,13 +14,13 @@ #include <gnuradio/block.h> #include <gnuradio/block_detail.h> -#include <gnuradio/buffer.h> -#include <gnuradio/logger.h> +#include <gnuradio/custom_lock.h> #include <gnuradio/prefs.h> #include <block_executor.h> #include <limits> #include <sstream> + namespace gr { // must be defined to either 0 or 1 @@ -53,27 +53,55 @@ inline static unsigned int round_down(unsigned int n, unsigned int multiple) // buffers or -1 if we're output blocked and the output we're // blocked on is done. // -static int -min_available_space(block_detail* d, int output_multiple, int min_noutput_items) +static int min_available_space(block* m, + block_detail* d, + int output_multiple, + int min_noutput_items, + int& output_idx) { + gr::logger_ptr logger; + gr::logger_ptr debug_logger; + gr::configure_default_loggers(logger, debug_logger, "min_available_space"); + int min_space = std::numeric_limits<int>::max(); if (min_noutput_items == 0) min_noutput_items = 1; - for (int i = 0; i < d->noutputs(); i++) { + for (int i = output_idx; i < d->noutputs(); i++) { buffer_sptr out_buf = d->output(i); gr::thread::scoped_lock guard(*out_buf->mutex()); - int avail_n = round_down(out_buf->space_available(), output_multiple); + int space_avail = out_buf->space_available(); + int avail_n = round_down(space_avail, output_multiple); + // If not strictly output multiple size aligned, potentially use all + // space available in the output buffer + if (avail_n == 0 && space_avail < output_multiple && !m->output_multiple_set()) { + avail_n = std::max(avail_n, space_avail); + } int best_n = round_down(out_buf->bufsize() / 2, output_multiple); if (best_n < min_noutput_items) throw std::runtime_error("Buffer too small for min_noutput_items"); int n = std::min(avail_n, best_n); if (n < min_noutput_items) { // We're blocked on output. - if (out_buf->done()) { // Downstream is done, therefore we're done. + LOG(std::ostringstream msg; + msg << m << " **[i=" << i << "] output_multiple=" << output_multiple + << " min_noutput_items=" << min_noutput_items + << " avail_n=" << avail_n << " best_n=" << best_n << " n=" << n + << " min_space=" << min_space << " outbuf_done=" << out_buf->done(); + GR_LOG_INFO(debug_logger, msg.str());); + + if (out_buf->done()) { // Downstream is done, therefore we're done. return -1; } + + output_idx = i; return 0; } min_space = std::min(min_space, n); + + LOG(std::ostringstream msg; + msg << m << " [i=" << i << "] output_multiple=" << output_multiple + << " min_noutput_items=" << min_noutput_items << " avail_n=" << avail_n + << " best_n=" << best_n << " n=" << n << " min_space=" << min_space; + GR_LOG_INFO(debug_logger, msg.str());); } return min_space; } @@ -233,6 +261,7 @@ block_executor::state block_executor::run_one_iteration() int max_noutput_items; int new_alignment = 0; int alignment_state = -1; + int output_idx = 0; block* m = d_block.get(); block_detail* d = m->detail().get(); @@ -255,8 +284,10 @@ block_executor::state block_executor::run_one_iteration() d_start_nitems_read.resize(0); // determine the minimum available output space - noutput_items = - min_available_space(d, m->output_multiple(), m->min_noutput_items()); + output_idx = 0; + out_try_again: + noutput_items = min_available_space( + m, d, m->output_multiple(), m->min_noutput_items(), output_idx); noutput_items = std::min(noutput_items, max_noutput_items); LOG(std::ostringstream msg; msg << "source: noutput_items = " << noutput_items; GR_LOG_INFO(d_debug_logger, msg.str());); @@ -264,8 +295,28 @@ block_executor::state block_executor::run_one_iteration() goto were_done; if (noutput_items == 0) { // we're output blocked - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); - return BLKD_OUT; + LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT"; + GR_LOG_INFO(d_debug_logger, msg.str());); + + buffer_sptr out_buf = d->output(output_idx); + + if (out_buf->output_blkd_cb_ready(m->output_multiple())) { + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + if (!out_buf->output_blocked_callback(m->output_multiple())) { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([1] callback FAILED)"; + GR_LOG_INFO(d_debug_logger, msg.str());); + return BLKD_OUT; + } else { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx + << ")"; + GR_LOG_INFO(d_debug_logger, msg.str());); + goto out_try_again; + } + } else { + return BLKD_OUT; + } } goto setup_call_to_work; // jump to common code @@ -278,14 +329,14 @@ block_executor::state block_executor::run_one_iteration() d_input_done.resize(d->ninputs()); d_output_items.resize(0); d_start_nitems_read.resize(d->ninputs()); - LOG(GR_LOG_INFO(d_debug_logger, "sink");); + // LOG(GR_LOG_INFO(d_debug_logger, "sink");); + LOG(std::ostringstream msg; msg << m << " -- sink"; + GR_LOG_INFO(d_debug_logger, msg.str());); max_items_avail = 0; for (int i = 0; i < d->ninputs(); i++) { { - /* - * Acquire the mutex and grab local copies of items_available and done. - */ + // Acquire the mutex and grab local copies of items_available and done. buffer_reader_sptr in_buf = d->input(i); gr::thread::scoped_lock guard(*in_buf->mutex()); d_ninput_items[i] = in_buf->items_available(); @@ -293,10 +344,10 @@ block_executor::state block_executor::run_one_iteration() } LOG(std::ostringstream msg; - msg << "d_ninput_items[" << i << "] = " << d_ninput_items[i]; + msg << m << " -- d_ninput_items[" << i << "] = " << d_ninput_items[i]; GR_LOG_INFO(d_debug_logger, msg.str());); LOG(std::ostringstream msg; - msg << "d_input_done[" << i << "] = " << d_input_done[i]; + msg << m << " -- d_input_done[" << i << "] = " << d_input_done[i]; GR_LOG_INFO(d_debug_logger, msg.str());); if (d_ninput_items[i] < m->output_multiple() && d_input_done[i]) @@ -309,13 +360,16 @@ block_executor::state block_executor::run_one_iteration() noutput_items = (int)(max_items_avail * m->relative_rate()); noutput_items = round_down(noutput_items, m->output_multiple()); noutput_items = std::min(noutput_items, max_noutput_items); - LOG(std::ostringstream msg; msg << "max_items_avail = " << max_items_avail; + LOG(std::ostringstream msg; + msg << m << " -- max_items_avail = " << max_items_avail; GR_LOG_INFO(d_debug_logger, msg.str());); - LOG(std::ostringstream msg; msg << "noutput_items = " << noutput_items; + LOG(std::ostringstream msg; msg << m << " -- noutput_items = " << noutput_items; GR_LOG_INFO(d_debug_logger, msg.str());); if (noutput_items == 0) { // we're blocked on input - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; + GR_LOG_INFO(d_debug_logger, msg.str())); return BLKD_IN; } @@ -331,12 +385,11 @@ block_executor::state block_executor::run_one_iteration() d_output_items.resize(d->noutputs()); d_start_nitems_read.resize(d->ninputs()); + blkd_in_try_again: max_items_avail = 0; for (int i = 0; i < d->ninputs(); i++) { { - /* - * Acquire the mutex and grab local copies of items_available and done. - */ + // Acquire the mutex and grab local copies of items_available and done. buffer_reader_sptr in_buf = d->input(i); gr::thread::scoped_lock guard(*in_buf->mutex()); d_ninput_items[i] = in_buf->items_available(); @@ -346,11 +399,13 @@ block_executor::state block_executor::run_one_iteration() } // determine the minimum available output space - noutput_items = - min_available_space(d, m->output_multiple(), m->min_noutput_items()); + output_idx = 0; + out_try_again2: + noutput_items = min_available_space( + m, d, m->output_multiple(), m->min_noutput_items(), output_idx); if (ENABLE_LOGGING) { std::ostringstream msg; - msg << "regular "; + msg << m << " -- regular "; msg << m->relative_rate_i() << ":" << m->relative_rate_d(); msg << " max_items_avail = " << max_items_avail; msg << " noutput_items = " << noutput_items; @@ -360,13 +415,36 @@ block_executor::state block_executor::run_one_iteration() goto were_done; if (noutput_items == 0) { // we're output blocked - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); - return BLKD_OUT; + // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT"; + GR_LOG_INFO(d_debug_logger, msg.str())); + + buffer_sptr out_buf = d->output(output_idx); + + if (out_buf->output_blkd_cb_ready(m->output_multiple())) { + // Call the output blocked callback which will tell us if it was + // able to unblock the output + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + if (!out_buf->output_blocked_callback(m->output_multiple())) { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([2] callback FAILED)"; + GR_LOG_INFO(d_debug_logger, msg.str());); + return BLKD_OUT; + } else { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx + << ")"; + GR_LOG_INFO(d_debug_logger, msg.str());); + goto out_try_again2; + } + } else { + return BLKD_OUT; + } } try_again: if (m->fixed_rate()) { - // try to work it forward starting with max_items_avail. + // Try to work it forward starting with max_items_avail. // We want to try to consume all the input we've got. int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail); @@ -416,6 +494,10 @@ block_executor::state block_executor::run_one_iteration() // ask the block how much input they need to produce noutput_items m->forecast(noutput_items, d_ninput_items_required); + LOG(std::ostringstream msg; + msg << m << " -- FCAST noutput_items=" << noutput_items << " inputs_required=" + << d_ninput_items_required[0] << " inputs_avail=" << d_ninput_items[0]; + GR_LOG_INFO(d_debug_logger, msg.str())); // See if we've got sufficient input available and make sure we // didn't overflow on the input. @@ -445,12 +527,29 @@ block_executor::state block_executor::run_one_iteration() } // We're blocked on input - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; + GR_LOG_INFO(d_debug_logger, msg.str())); + + buffer_reader_sptr in_buf = d->input(i); + + LOG(std::ostringstream msg; + msg << m << " (t: " << this << ") -- pre-callback"; + GR_LOG_DEBUG(d_debug_logger, msg.str())); + + if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) { + gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer()); + if (in_buf->input_blocked_callback(d_ninput_items_required[i], + d_ninput_items[i])) { + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN -- TRY AGAIN"; + GR_LOG_INFO(d_debug_logger, msg.str())); + goto blkd_in_try_again; + } + } + if (d_input_done[i]) // If the upstream block is done, we're done goto were_done; // Is it possible to ever fulfill this request? - buffer_reader_sptr in_buf = d->input(i); if (d_ninput_items_required[i] > in_buf->max_possible_items_available()) { // Nope, never going to happen... std::ostringstream msg; @@ -476,14 +575,18 @@ block_executor::state block_executor::run_one_iteration() // We've got enough data on each input to produce noutput_items. // Finish setting up the call to work. - for (int i = 0; i < d->ninputs(); i++) + for (int i = 0; i < d->ninputs(); i++) { + d->input(i)->buffer()->increment_active(); d_input_items[i] = d->input(i)->read_pointer(); + } setup_call_to_work: d->d_produce_or = 0; - for (int i = 0; i < d->noutputs(); i++) + for (int i = 0; i < d->noutputs(); i++) { + d->output(i)->increment_active(); d_output_items[i] = d->output(i)->write_pointer(); + } // determine where to start looking for new tags for (int i = 0; i < d->ninputs(); i++) @@ -504,7 +607,10 @@ block_executor::state block_executor::run_one_iteration() #endif /* GR_PERFORMANCE_COUNTERS */ LOG(std::ostringstream msg; - msg << "general_work: noutput_items = " << noutput_items << " result = " << n; + msg << m << " -- general_work: noutput_items = " << noutput_items + << " ninput_items=" << (d->ninputs() >= 1 ? d_ninput_items[0] : 0) + << " ninput_req=" << (d->ninputs() >= 1 ? d_ninput_items_required[0] : 0) + << " result = " << n; GR_LOG_INFO(d_debug_logger, msg.str());); // Adjust number of unaligned items left to process @@ -521,14 +627,21 @@ block_executor::state block_executor::run_one_iteration() m->mp_relative_rate(), m->update_rate(), d_returned_tags, - m->unique_id())) + m->unique_id())) { + d->post_work_cleanup(); goto were_done; + } - if (n == block::WORK_DONE) + if (n == block::WORK_DONE) { + d->post_work_cleanup(); goto were_done; + } - if (n != block::WORK_CALLED_PRODUCE) + if (n != block::WORK_CALLED_PRODUCE) { d->produce_each(n); // advance write pointers + } + + d->post_work_cleanup(); // For some blocks that can change their produce/consume ratio // (the relative_rate), we might want to automatically update @@ -558,13 +671,30 @@ block_executor::state block_executor::run_one_iteration() } */ + // The call to general_work() produced no output therefore the block may + // be "effectively output blocked". Call the output blocked callback + // just in case, it can do no harm. + for (int i = 0; i < d->noutputs(); i++) { + buffer_sptr out_buf = d->output(i); + LOG(std::ostringstream msg; + msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED"; + GR_LOG_DEBUG(d_debug_logger, msg.str());); + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + out_buf->output_blocked_callback(m->output_multiple(), true); + LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i + << "] -- OUTPUT BLOCKED CBACK: " << rc; + GR_LOG_DEBUG(d_debug_logger, msg.str());); + } + // Have the caller try again... return READY_NO_OUTPUT; } GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine"); were_done: - LOG(GR_LOG_INFO(d_debug_logger, "we're done");); + // LOG(GR_LOG_INFO(d_debug_logger, "we're done");); + LOG(std::ostringstream msg; msg << m << " -- we're done"; + GR_LOG_INFO(d_debug_logger, msg.str())); d->set_done(true); return DONE; } diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc index cb6b949932..7fd0a39579 100644 --- a/gnuradio-runtime/lib/buffer.cc +++ b/gnuradio-runtime/lib/buffer.cc @@ -13,6 +13,10 @@ #endif #include "vmcircbuf.h" #include <gnuradio/buffer.h> +#include <gnuradio/buffer_double_mapped.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_single_mapped.h> +#include <gnuradio/buffer_type.h> #include <gnuradio/integer_math.h> #include <gnuradio/math.h> #include <boost/format.hpp> @@ -23,7 +27,6 @@ namespace gr { static long s_buffer_count = 0; // counts for debugging storage mgmt -static long s_buffer_reader_count = 0; /* ---------------------------------------------------------------------------- Notes on storage management @@ -52,117 +55,81 @@ static long s_buffer_reader_count = 0; gr::buffer_reader goes to zero, we can successfully reclaim it. ---------------------------------------------------------------------------- */ -/* - * Compute the minimum number of buffer items that work (i.e., - * address space wrap-around works). To work is to satisfy this - * constraint for integer buffer_size and k: - * - * type_size * nitems == k * page_size - */ -static inline long minimum_buffer_items(long type_size, long page_size) -{ - return page_size / GR_GCD(type_size, page_size); -} - -buffer::buffer(int nitems, size_t sizeof_item, block_sptr link) +buffer::buffer(BufferMappingType buf_type, + int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link) : d_base(0), d_bufsize(0), + d_buf_map_type(buf_type), d_max_reader_delay(0), + d_max_reader_history(1), + d_has_history(false), d_sizeof_item(sizeof_item), d_link(link), d_write_index(0), d_abs_write_offset(0), d_done(false), - d_last_min_items_read(0) + d_last_min_items_read(0), + d_callback_flag(false), + d_active_pointer_counter(0), + d_downstream_lcm_nitems(downstream_lcm_nitems), + d_write_multiple(0) { gr::configure_default_loggers(d_logger, d_debug_logger, "buffer"); - if (!allocate_buffer(nitems, sizeof_item)) - throw std::bad_alloc(); s_buffer_count++; } -buffer_sptr make_buffer(int nitems, size_t sizeof_item, block_sptr link) +buffer_sptr make_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link, + block_sptr buf_owner) { - return buffer_sptr(new buffer(nitems, sizeof_item, link)); -} +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + gr::logger_ptr logger; + gr::logger_ptr debug_logger; + gr::configure_default_loggers(logger, debug_logger, "make_buffer"); + std::ostringstream msg; +#endif -buffer::~buffer() -{ - assert(d_readers.size() == 0); - s_buffer_count--; -} +#if DEBUG_SINGLE_MAPPED + if (1) { +#else + if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) { +#endif + // Buffer type is NOT the default non custom variety so allocate a + // buffer_single_mapped instance +#ifdef BUFFER_DEBUG + msg << "buffer_single_mapped nitems: " << nitems + << " -- sizeof_item: " << sizeof_item; + GR_LOG_DEBUG(logger, msg.str()); +#endif -/*! - * sets d_vmcircbuf, d_base, d_bufsize. - * returns true iff successful. - */ -bool buffer::allocate_buffer(int nitems, size_t sizeof_item) -{ - int orig_nitems = nitems; - - // Any buffersize we come up with must be a multiple of min_nitems. - int granularity = gr::vmcircbuf_sysconfig::granularity(); - int min_nitems = minimum_buffer_items(sizeof_item, granularity); - - // Round-up nitems to a multiple of min_nitems. - if (nitems % min_nitems != 0) - nitems = ((nitems / min_nitems) + 1) * min_nitems; - - // If we rounded-up a whole bunch, give the user a heads up. - // This only happens if sizeof_item is not a power of two. - - if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) { - auto msg = - str(boost::format( - "allocate_buffer: tried to allocate" - " %d items of size %d. Due to alignment requirements" - " %d were allocated. If this isn't OK, consider padding" - " your structure to a power-of-two bytes." - " On this platform, our allocation granularity is %d bytes.") % - orig_nitems % sizeof_item % nitems % granularity); - GR_LOG_WARN(d_logger, msg.c_str()); - } + return buffer_sptr(new buffer_single_mapped( + nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner)); - d_bufsize = nitems; - d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item)); - if (d_vmcircbuf == 0) { - std::ostringstream msg; - msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size " - << d_bufsize * d_sizeof_item / 1024 << " KB"; - GR_LOG_ERROR(d_logger, msg.str()); - return false; - } + } else { + // Default to allocating a buffer_double_mapped instance +#ifdef BUFFER_DEBUG + msg << "buffer_double_mapped nitems: " << nitems + << " -- sizeof_item: " << sizeof_item; + GR_LOG_DEBUG(logger, msg.str()); +#endif - d_base = (char*)d_vmcircbuf->pointer_to_first_copy(); - return true; + return buffer_sptr( + new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); + } } -int buffer::space_available() +buffer::~buffer() { - if (d_readers.empty()) - return d_bufsize - 1; // See comment below - - else { - // Find out the maximum amount of data available to our readers - - int most_data = d_readers[0]->items_available(); - uint64_t min_items_read = d_readers[0]->nitems_read(); - for (size_t i = 1; i < d_readers.size(); i++) { - most_data = std::max(most_data, d_readers[i]->items_available()); - min_items_read = std::min(min_items_read, d_readers[i]->nitems_read()); - } - - if (min_items_read != d_last_min_items_read) { - prune_tags(d_last_min_items_read); - d_last_min_items_read = min_items_read; - } - - // The -1 ensures that the case d_write_index == d_read_index is - // unambiguous. It indicates that there is no data for the reader - return d_bufsize - most_data - 1; - } + assert(d_readers.size() == 0); + s_buffer_count--; } void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; } @@ -170,8 +137,22 @@ void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; } void buffer::update_write_pointer(int nitems) { gr::thread::scoped_lock guard(*mutex()); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + unsigned orig_wr_idx = d_write_index; +#endif + d_write_index = index_add(d_write_index, nitems); d_abs_write_offset += nitems; + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx + << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif } void buffer::set_done(bool done) @@ -180,20 +161,6 @@ void buffer::set_done(bool done) d_done = done; } -buffer_reader_sptr -buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay) -{ - if (nzero_preload < 0) - throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0"); - - buffer_reader_sptr r( - new buffer_reader(buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); - r->declare_sample_delay(delay); - buf->d_readers.push_back(r.get()); - - return r; -} - void buffer::drop_reader(buffer_reader* reader) { std::vector<buffer_reader*>::iterator result = @@ -263,92 +230,40 @@ void buffer::prune_tags(uint64_t max_time) } } -long buffer_ncurrently_allocated() { return s_buffer_count; } - -// ---------------------------------------------------------------------------- - -buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link) - : d_buffer(buffer), - d_read_index(read_index), - d_abs_read_offset(0), - d_link(link), - d_attr_delay(0) -{ - s_buffer_reader_count++; -} - -buffer_reader::~buffer_reader() +void buffer::on_lock(gr::thread::scoped_lock& lock) { - d_buffer->drop_reader(this); - s_buffer_reader_count--; + // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object + + // Wait until no other callback is active and no pointers are active for + // the buffer, then mark the callback flag active. + d_cv.wait(lock, [this]() { + return (d_callback_flag == false && d_active_pointer_counter == 0); + }); + d_callback_flag = true; } -void buffer_reader::declare_sample_delay(unsigned delay) +void buffer::on_unlock() { - d_attr_delay = delay; - d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay); -} - -unsigned buffer_reader::sample_delay() const { return d_attr_delay; } - -int buffer_reader::items_available() const -{ - return d_buffer->index_sub(d_buffer->d_write_index, d_read_index); -} + // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object -const void* buffer_reader::read_pointer() -{ - return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item]; + // Mark the callback flag inactive and notify anyone waiting + d_callback_flag = false; + d_cv.notify_all(); } -void buffer_reader::update_read_pointer(int nitems) -{ - gr::thread::scoped_lock guard(*mutex()); - d_read_index = d_buffer->index_add(d_read_index, nitems); - d_abs_read_offset += nitems; -} +long buffer_ncurrently_allocated() { return s_buffer_count; } -void buffer_reader::get_tags_in_range(std::vector<tag_t>& v, - uint64_t abs_start, - uint64_t abs_end, - long id) +std::ostream& operator<<(std::ostream& os, const buffer& buf) { - gr::thread::scoped_lock guard(*mutex()); - - uint64_t lower_bound = abs_start - d_attr_delay; - // check for underflow and if so saturate at 0 - if (lower_bound > abs_start) - lower_bound = 0; - uint64_t upper_bound = abs_end - d_attr_delay; - // check for underflow and if so saturate at 0 - if (upper_bound > abs_end) - upper_bound = 0; - - v.clear(); - std::multimap<uint64_t, tag_t>::iterator itr = - d_buffer->get_tags_lower_bound(lower_bound); - std::multimap<uint64_t, tag_t>::iterator itr_end = - d_buffer->get_tags_upper_bound(upper_bound); - - uint64_t item_time; - while (itr != itr_end) { - item_time = (*itr).second.offset + d_attr_delay; - if ((item_time >= abs_start) && (item_time < abs_end)) { - std::vector<long>::iterator id_itr; - id_itr = std::find( - itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id); - // If id is not in the vector of marked blocks - if (id_itr == itr->second.marked_deleted.end()) { - tag_t t = (*itr).second; - t.offset += d_attr_delay; - v.push_back(t); - v.back().marked_deleted.clear(); - } - } - itr++; + os << std::endl + << " sz: " << buf.d_bufsize << std::endl + << " nrdrs: " << buf.d_readers.size() << std::endl; + for (auto& rdr : buf.d_readers) { + os << " rd_idx: " << rdr->get_read_index() << std::endl + << " abs_rd_offset: " << rdr->get_abs_read_offset() << std::endl + << std::endl; } + return os; } -long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; } - } /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_double_mapped.cc b/gnuradio-runtime/lib/buffer_double_mapped.cc new file mode 100644 index 0000000000..ad99c2162b --- /dev/null +++ b/gnuradio-runtime/lib/buffer_double_mapped.cc @@ -0,0 +1,155 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include "vmcircbuf.h" +#include <gnuradio/block.h> +#include <gnuradio/buffer_double_mapped.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/integer_math.h> +#include <gnuradio/math.h> +#include <assert.h> +#include <algorithm> +#include <iostream> +#include <stdexcept> + +namespace gr { + +/* + * Compute the minimum number of buffer items that work (i.e., + * address space wrap-around works). To work is to satisfy this + * constraint for integer buffer_size and k: + * + * type_size * nitems == k * page_size + */ +static inline long minimum_buffer_items(long type_size, long page_size) +{ + return page_size / GR_GCD(type_size, page_size); +} + + +buffer_double_mapped::buffer_double_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link) + : buffer(BufferMappingType::DoubleMapped, + nitems, + sizeof_item, + downstream_lcm_nitems, + link) +{ + gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_double_mapped"); + if (!allocate_buffer(nitems, sizeof_item)) + throw std::bad_alloc(); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + { + std::ostringstream msg; + msg << "[" << this << "] " + << "buffer_double_mapped constructor -- history: " << link->history(); + GR_LOG_DEBUG(d_logger, msg.str()); + } +#endif +} + +buffer_sptr make_buffer_double_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link) +{ + return buffer_sptr( + new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); +} + +buffer_double_mapped::~buffer_double_mapped() {} + +/*! + * sets d_vmcircbuf, d_base, d_bufsize. + * returns true iff successful. + */ +bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item) +{ + int orig_nitems = nitems; + + // Any buffer size we come up with must be a multiple of min_nitems. + int granularity = gr::vmcircbuf_sysconfig::granularity(); + int min_nitems = minimum_buffer_items(sizeof_item, granularity); + + // Round-up nitems to a multiple of min_nitems. + if (nitems % min_nitems != 0) + nitems = ((nitems / min_nitems) + 1) * min_nitems; + + // If we rounded-up a whole bunch, give the user a heads up. + // This only happens if sizeof_item is not a power of two. + if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) { + auto msg = + str(boost::format( + "allocate_buffer: tried to allocate" + " %d items of size %d. Due to alignment requirements" + " %d were allocated. If this isn't OK, consider padding" + " your structure to a power-of-two bytes." + " On this platform, our allocation granularity is %d bytes.") % + orig_nitems % sizeof_item % nitems % granularity); + GR_LOG_WARN(d_logger, msg.c_str()); + } + + d_bufsize = nitems; + d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item)); + if (d_vmcircbuf == 0) { + std::ostringstream msg; + msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size " + << d_bufsize * d_sizeof_item / 1024 << " KB"; + GR_LOG_ERROR(d_logger, msg.str()); + return false; + } + + d_base = (char*)d_vmcircbuf->pointer_to_first_copy(); + return true; +} + +int buffer_double_mapped::space_available() +{ + if (d_readers.empty()) + return d_bufsize - 1; // See comment below + + else { + + // Find out the maximum amount of data available to our readers + int most_data = d_readers[0]->items_available(); + uint64_t min_items_read = d_readers[0]->nitems_read(); + for (size_t i = 1; i < d_readers.size(); i++) { + most_data = std::max(most_data, d_readers[i]->items_available()); + min_items_read = std::min(min_items_read, d_readers[i]->nitems_read()); + } + + if (min_items_read != d_last_min_items_read) { + prune_tags(d_last_min_items_read); + d_last_min_items_read = min_items_read; + } + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "space_available() called d_write_index: " << d_write_index + << " -- space_available: " << (d_bufsize - most_data - 1); + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // The -1 ensures that the case d_write_index == d_read_index is + // unambiguous. It indicates that there is no data for the reader + return d_bufsize - most_data - 1; + } +} + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_reader.cc b/gnuradio-runtime/lib/buffer_reader.cc new file mode 100644 index 0000000000..7ead53032e --- /dev/null +++ b/gnuradio-runtime/lib/buffer_reader.cc @@ -0,0 +1,180 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <gnuradio/block.h> +#include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_reader_sm.h> +#include <gnuradio/integer_math.h> +#include <gnuradio/math.h> +#include <assert.h> +#include <algorithm> +#include <iostream> +#include <stdexcept> + +namespace gr { + +static long s_buffer_reader_count = 0; + +buffer_reader_sptr +buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay) +{ + if (nzero_preload < 0) + throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0"); + + buffer_reader_sptr r; + + if (buf->get_mapping_type() == BufferMappingType::DoubleMapped) { + r.reset(new buffer_reader( + buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); + r->declare_sample_delay(delay); + } else if (buf->get_mapping_type() == BufferMappingType::SingleMapped) { + r.reset(new buffer_reader_sm( + buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); + r->declare_sample_delay(delay); + + // Update reader block history + buf->update_reader_block_history(link->history(), delay); + r->d_read_index = buf->d_write_index - nzero_preload; + } + + buf->d_readers.push_back(r.get()); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::cerr << " [" << buf.get() << ";" << r.get() + << "] buffer_add_reader() nzero_preload " << nzero_preload + << " -- delay: " << delay << " -- history: " << link->history() + << " -- RD_idx: " << r->d_read_index << std::endl; +#endif + + return r; +} + +buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link) + : d_buffer(buffer), + d_read_index(read_index), + d_abs_read_offset(0), + d_link(link), + d_attr_delay(0) +{ +#ifdef BUFFER_DEBUG + gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_reader"); +#endif + + s_buffer_reader_count++; +} + +buffer_reader::~buffer_reader() +{ + d_buffer->drop_reader(this); + s_buffer_reader_count--; +} + +void buffer_reader::declare_sample_delay(unsigned delay) +{ + d_attr_delay = delay; + d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay); +} + +unsigned buffer_reader::sample_delay() const { return d_attr_delay; } + +int buffer_reader::items_available() // const +{ + int available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << d_buffer << ";" << this << "] " + << "items_available() WR_idx: " << d_buffer->d_write_index + << " -- WR items: " << d_buffer->nitems_written() + << " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-" + << d_attr_delay << ") -- available: " << available; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + return available; +} + +const void* buffer_reader::read_pointer() +{ + return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item]; +} + +void buffer_reader::update_read_pointer(int nitems) +{ + gr::thread::scoped_lock guard(*mutex()); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + unsigned orig_rd_idx = d_read_index; +#endif + + d_read_index = d_buffer->index_add(d_read_index, nitems); + d_abs_read_offset += nitems; + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << d_buffer << ";" << this + << "] update_read_pointer -- orig d_read_index: " << orig_rd_idx + << " -- nitems: " << nitems << " -- d_read_index: " << d_read_index; + GR_LOG_DEBUG(d_buffer->d_logger, msg.str()); +#endif +} + +void buffer_reader::get_tags_in_range(std::vector<tag_t>& v, + uint64_t abs_start, + uint64_t abs_end, + long id) +{ + gr::thread::scoped_lock guard(*mutex()); + + uint64_t lower_bound = abs_start - d_attr_delay; + // check for underflow and if so saturate at 0 + if (lower_bound > abs_start) + lower_bound = 0; + uint64_t upper_bound = abs_end - d_attr_delay; + // check for underflow and if so saturate at 0 + if (upper_bound > abs_end) + upper_bound = 0; + + v.clear(); + std::multimap<uint64_t, tag_t>::iterator itr = + d_buffer->get_tags_lower_bound(lower_bound); + std::multimap<uint64_t, tag_t>::iterator itr_end = + d_buffer->get_tags_upper_bound(upper_bound); + + uint64_t item_time; + while (itr != itr_end) { + item_time = (*itr).second.offset + d_attr_delay; + if ((item_time >= abs_start) && (item_time < abs_end)) { + std::vector<long>::iterator id_itr; + id_itr = std::find( + itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id); + // If id is not in the vector of marked blocks + if (id_itr == itr->second.marked_deleted.end()) { + tag_t t = (*itr).second; + t.offset += d_attr_delay; + v.push_back(t); + v.back().marked_deleted.clear(); + } + } + itr++; + } +} + +long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; } + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc new file mode 100644 index 0000000000..9e0fac584f --- /dev/null +++ b/gnuradio-runtime/lib/buffer_reader_sm.cc @@ -0,0 +1,149 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <gnuradio/block.h> +#include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader_sm.h> +#include <gnuradio/integer_math.h> +#include <gnuradio/math.h> +#include <assert.h> +#include <algorithm> +#include <iostream> +#include <limits> +#include <stdexcept> + +namespace gr { + +buffer_reader_sm::~buffer_reader_sm() {} + +int buffer_reader_sm::items_available() +{ + int available = 0; + + if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) { + if (d_buffer->d_write_index == d_read_index) { + if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) { + available = d_buffer->d_bufsize - d_read_index; + } + } else { + available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index); + } + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << d_buffer << ";" << this << "] " + << "items_available() WR_idx: " << d_buffer->d_write_index + << " -- WR items: " << d_buffer->nitems_written() + << " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-" + << d_attr_delay << ") -- available: " << available; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + return available; +} + +bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const +{ + gr::thread::scoped_lock(*d_buffer->mutex()); + + return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) && + (d_buffer->d_write_index < d_read_index)); +} + +bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail) +{ + // Maybe adjust read pointers from min read index? + // This would mean that *all* readers must be > (passed) the write index + if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) && + (d_buffer->d_write_index < d_read_index)) { + + // Update items available before going farther as it could be stale + items_avail = items_available(); + + // Find reader with the smallest read index that is greater than the + // write index + uint32_t min_reader_index = std::numeric_limits<uint32_t>::max(); + uint32_t min_read_idx = std::numeric_limits<uint32_t>::max(); + for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { + if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) { + // Record index of reader with minimum read-index + if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_buffer->d_readers[idx]->d_read_index; + min_reader_index = idx; + } + } + } + + // Note items_avail might be zero, that's okay. + items_avail += d_read_index - min_read_idx; + int gap = min_read_idx - d_buffer->d_write_index; + if (items_avail > gap) { + return false; + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << d_buffer << ";" << this << "] " + << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index + << " -- WR items: " << d_buffer->nitems_written() + << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx; + for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { + if (idx != min_reader_index) { + msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index; + } + } + + msg << " -- GAP: " << gap << " -- items_required: " << items_required + << " -- items_avail: " << items_avail; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Shift existing data down to make room for blocked data at end of buffer + uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item; + char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item); + std::memmove(dest, d_buffer->d_base, move_data_size); + + // Next copy the data from the end of the buffer back to the beginning + uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item; + char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item); + std::memcpy(d_buffer->d_base, src, avail_data_size); + + // Now adjust write pointer + d_buffer->d_write_index += items_avail; + + // Finally adjust all reader pointers + for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { + if (idx == min_reader_index) { + d_buffer->d_readers[idx]->d_read_index = 0; + } else { + d_buffer->d_readers[idx]->d_read_index += items_avail; + d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize; + } + } + + return true; + } + + return false; +} + +buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer, + unsigned int read_index, + block_sptr link) + : buffer_reader(buffer, read_index, link) +{ +} + + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc new file mode 100644 index 0000000000..6024396557 --- /dev/null +++ b/gnuradio-runtime/lib/buffer_single_mapped.cc @@ -0,0 +1,297 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <gnuradio/block.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_single_mapped.h> +#include <gnuradio/integer_math.h> +#include <gnuradio/math.h> +#include <gnuradio/thread/thread.h> +#include <assert.h> +#include <algorithm> +#include <iostream> +#include <stdexcept> + +namespace gr { + +buffer_single_mapped::buffer_single_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link, + block_sptr buf_owner) + : buffer(BufferMappingType::SingleMapped, + nitems, + sizeof_item, + downstream_lcm_nitems, + link), + d_buf_owner(buf_owner), + d_buffer(nullptr, + std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1)) +{ + gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped"); + if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems)) + throw std::bad_alloc(); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + { + std::ostringstream msg; + msg << "[" << this << "] " + << "buffer_single_mapped constructor -- history: " << link->history(); + GR_LOG_DEBUG(d_logger, msg.str()); + } +#endif +} + +buffer_single_mapped::~buffer_single_mapped() {} + +/*! + * Allocates underlying buffer. + * returns true iff successful. + */ +bool buffer_single_mapped::allocate_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems) +{ +#ifdef BUFFER_DEBUG + int orig_nitems = nitems; +#endif + + // Unlike the double mapped buffer case that can easily wrap back onto itself + // for both reads and writes the single mapped case needs to be aware of read + // and write granularity and size the underlying buffer accordingly. Otherwise + // the calls to space_available() and items_available() may return values that + // are too small and the scheduler will get stuck. + uint64_t write_granularity = 1; + + if (link()->fixed_rate()) { + // Fixed rate + int num_inputs = + link()->fixed_rate_noutput_to_ninput(1) - (link()->history() - 1); + write_granularity = + link()->fixed_rate_ninput_to_noutput(num_inputs + (link()->history() - 1)); + } + + if (link()->relative_rate() != 1.0) { + // Some blocks say they have fixed rate but actually have a relative + // rate set (looking at you puncture_bb...) so make this a separate + // check. + + // Relative rate + write_granularity = link()->relative_rate_i(); + } + + // If the output multiple has been set explicitly then adjust the write + // granularity. + if (link()->output_multiple_set()) { + write_granularity = + GR_LCM(write_granularity, (uint64_t)link()->output_multiple()); + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "WRITE GRANULARITY: " << write_granularity; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Adjust size so output buffer size is a multiple of the write granularity + if (write_granularity != 1 || downstream_lcm_nitems != 1) { + uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems); + uint64_t remainder = nitems % size_align_adjust; + nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0; + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "allocate_buffer()** called nitems: " << orig_nitems + << " -- read_multiple: " << downstream_lcm_nitems + << " -- write_multiple: " << write_granularity + << " -- NEW nitems: " << nitems; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + } + + // Allocate a new custom buffer from the owning block + char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item); + assert(buf != nullptr); + d_buffer.reset(buf); + + d_base = d_buffer.get(); + d_bufsize = nitems; + + d_downstream_lcm_nitems = downstream_lcm_nitems; + d_write_multiple = write_granularity; + + return true; +} + +bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple) +{ + uint32_t space_avail = 0; + { + gr::thread::scoped_lock(*this->mutex()); + space_avail = space_available(); + } + return ((space_avail > 0) && + ((space_avail / output_multiple) * output_multiple == 0)); +} + + +bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force) +{ + uint32_t space_avail = space_available(); + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback()*** WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() + << " -- output_multiple: " << output_multiple + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) || + force) { + // Find reader with the smallest read index + uint32_t min_read_idx = d_readers[0]->d_read_index; + for (size_t idx = 1; idx < d_readers.size(); ++idx) { + // Record index of reader with minimum read-index + if (d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_readers[idx]->d_read_index; + } + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx + << " -- shortcircuit: " + << ((min_read_idx == 0) || (min_read_idx >= d_write_index)) + << " -- to_move_items: " << (d_write_index - min_read_idx) + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Make sure we have enough room to start writing back at the beginning + if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) { + return false; + } + + // Determine how much "to be read" data needs to be moved + int to_move_items = d_write_index - min_read_idx; + assert(to_move_items > 0); + uint32_t to_move_bytes = to_move_items * d_sizeof_item; + + // Shift "to be read" data back to the beginning of the buffer + std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes); + + // Adjust write index and each reader index + d_write_index -= min_read_idx; + + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + d_readers[idx]->d_read_index -= min_read_idx; + } + + return true; + } + + return false; +} + +int buffer_single_mapped::space_available() +{ + if (d_readers.empty()) + return d_bufsize; + + else { + + size_t min_items_read_idx = 0; + uint64_t min_items_read = d_readers[0]->nitems_read(); + for (size_t idx = 1; idx < d_readers.size(); ++idx) { + // Record index of reader with minimum nitems read + if (d_readers[idx]->nitems_read() < + d_readers[min_items_read_idx]->nitems_read()) { + min_items_read_idx = idx; + } + min_items_read = std::min(min_items_read, d_readers[idx]->nitems_read()); + } + + buffer_reader* min_idx_reader = d_readers[min_items_read_idx]; + unsigned min_read_index = d_readers[min_items_read_idx]->d_read_index; + + // For single mapped buffer there is no wrapping beyond the end of the + // buffer +#ifdef BUFFER_DEBUG + int thecase = 4; // REMOVE ME - just for debug +#endif + int space = d_bufsize - d_write_index; + + if (min_read_index == d_write_index) { +#ifdef BUFFER_DEBUG + thecase = 1; +#endif + + // If the (min) read index and write index are equal then the buffer + // is either completely empty or completely full depending on if + // the number of items read matches the number written + size_t offset = ((min_idx_reader->link()->history() - 1) + + min_idx_reader->sample_delay()); + if ((min_idx_reader->nitems_read() - offset) != nitems_written()) { +#ifdef BUFFER_DEBUG + thecase = 2; +#endif + space = 0; + } + } else if (min_read_index > d_write_index) { +#ifdef BUFFER_DEBUG + thecase = 3; +#endif + space = min_read_index - d_write_index; + // Leave extra space in case the reader gets stuck and needs realignment + { + if ((d_write_index > (d_bufsize / 2)) || + (min_read_index < (d_bufsize / 2))) { +#ifdef BUFFER_DEBUG + thecase = 17; +#endif + space = 0; + } else { + space = (d_bufsize / 2) - d_write_index; + } + } + } + + if (min_items_read != d_last_min_items_read) { + prune_tags(d_last_min_items_read); + d_last_min_items_read = min_items_read; + } + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "space_available() called (case: " << thecase + << ") d_write_index: " << d_write_index << " (" << nitems_written() << ") " + << " -- min_read_index: " << min_read_index << " (" + << min_idx_reader->nitems_read() << ") " + << " -- space: " << space + << " (sample delay: " << min_idx_reader->sample_delay() << ")"; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + return space; + } +} + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_type.cc b/gnuradio-runtime/lib/buffer_type.cc new file mode 100644 index 0000000000..b667eded1e --- /dev/null +++ b/gnuradio-runtime/lib/buffer_type.cc @@ -0,0 +1,18 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ +#include <gnuradio/buffer_type.h> + +namespace gr { + +uint32_t buffer_type_base::s_nextId = 0; +std::mutex buffer_type_base::s_mutex; + + +} /* namespace gr */
\ No newline at end of file diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc index 0df75553e0..b9986f8370 100644 --- a/gnuradio-runtime/lib/flat_flowgraph.cc +++ b/gnuradio-runtime/lib/flat_flowgraph.cc @@ -15,6 +15,9 @@ #include "flat_flowgraph.h" #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_type.h> +#include <gnuradio/integer_math.h> #include <gnuradio/logger.h> #include <gnuradio/prefs.h> #include <volk/volk.h> @@ -24,6 +27,7 @@ namespace gr { + // 32Kbyte buffer size between blocks #define GR_FIXED_BUFFER_SIZE (32 * (1L << 10)) @@ -44,8 +48,9 @@ void flat_flowgraph::setup_connections() basic_block_vector_t blocks = calc_used_blocks(); // Assign block details to blocks - for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) - cast_to_block_sptr(*p)->set_detail(allocate_block_detail(*p)); + for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { + allocate_block_detail(*p); + } // Connect inputs to outputs for each block for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { @@ -56,7 +61,7 @@ void flat_flowgraph::setup_connections() block->set_is_unaligned(false); } - // Connect message ports connetions + // Connect message ports connections for (msg_edge_viter_t i = d_msg_edges.begin(); i != d_msg_edges.end(); i++) { GR_LOG_DEBUG( d_debug_logger, @@ -67,11 +72,10 @@ void flat_flowgraph::setup_connections() } } -block_detail_sptr flat_flowgraph::allocate_block_detail(basic_block_sptr block) +void flat_flowgraph::allocate_block_detail(basic_block_sptr block) { int ninputs = calc_used_ports(block, true).size(); int noutputs = calc_used_ports(block, false).size(); - block_detail_sptr detail = make_block_detail(ninputs, noutputs); block_sptr grblock = cast_to_block_sptr(block); if (!grblock) @@ -80,98 +84,88 @@ block_detail_sptr flat_flowgraph::allocate_block_detail(basic_block_sptr block) block->alias()) .str()); - GR_LOG_DEBUG(d_debug_logger, "Creating block detail for " + block->identifier()); + // Determine the downstream max per output port + std::vector<int> downstream_max_nitems(noutputs, 0); + std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1); +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "BLOCK: " << block->identifier(); + GR_LOG_DEBUG(d_logger, msg.str()); // could also be d_debug_logger +#endif for (int i = 0; i < noutputs; i++) { - grblock->expand_minmax_buffer(i); + int nitems = 0; + uint64_t lcm_nitems = 1; + basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i); + for (basic_block_viter_t blk = downstream_blocks.begin(); + blk != downstream_blocks.end(); + blk++) { + block_sptr dgrblock = cast_to_block_sptr(*blk); + if (!dgrblock) + throw std::runtime_error("allocate_buffer found non-gr::block"); + +#ifdef BUFFER_DEBUG + msg.str(""); + msg << " DWNSTRM BLOCK: " << dgrblock->identifier(); + GR_LOG_DEBUG(d_logger, msg.str()); +#endif - buffer_sptr buffer = allocate_buffer(block, i); - GR_LOG_DEBUG(d_debug_logger, - "Allocated buffer for output " + block->identifier() + " " + - std::to_string(i)); - detail->set_output(i, buffer); - - // Update the block's max_output_buffer based on what was actually allocated. - if ((grblock->max_output_buffer(i) != buffer->bufsize()) && - (grblock->max_output_buffer(i) != -1)) - GR_LOG_WARN(d_logger, - boost::format("Block (%1%) max output buffer set to %2%" - " instead of requested %3%") % - grblock->alias() % buffer->bufsize() % - grblock->max_output_buffer(i)); - grblock->set_max_output_buffer(i, buffer->bufsize()); - } + // If any downstream blocks are decimators and/or have a large + // output_multiple, ensure we have a buffer at least twice their + // decimation factor*output_multiple + double decimation = (1.0 / dgrblock->relative_rate()); + int multiple = dgrblock->output_multiple(); + int history = dgrblock->history(); + nitems = + std::max(nitems, static_cast<int>(2 * (decimation * multiple + history))); + + // Calculate the LCM of downstream reader nitems +#ifdef BUFFER_DEBUG + msg.str(""); + msg << " OUT MULTIPLE: " << multiple; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif - return detail; -} + if (dgrblock->fixed_rate()) { + lcm_nitems = GR_LCM(lcm_nitems, + (uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) - + (dgrblock->history() - 1))); + } + if (dgrblock->relative_rate() != 1.0) { + // Relative rate + lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d()); + } -buffer_sptr flat_flowgraph::allocate_buffer(basic_block_sptr block, int port) -{ - block_sptr grblock = cast_to_block_sptr(block); - if (!grblock) - throw std::runtime_error("allocate_buffer found non-gr::block"); - int item_size = block->output_signature()->sizeof_stream_item(port); - - // *2 because we're now only filling them 1/2 way in order to - // increase the available parallelism when using the TPB scheduler. - // (We're double buffering, where we used to single buffer) - int nitems = s_fixed_buffer_size * 2 / item_size; - - // Make sure there are at least twice the output_multiple no. of items - if (nitems < 2 * grblock->output_multiple()) // Note: this means output_multiple() - nitems = 2 * grblock->output_multiple(); // can't be changed by block dynamically - - // If any downstream blocks are decimators and/or have a large output_multiple, - // ensure we have a buffer at least twice their decimation factor*output_multiple - basic_block_vector_t blocks = calc_downstream_blocks(block, port); - - // limit buffer size if indicated - if (grblock->max_output_buffer(port) > 0) { - // GR_LOG_INFO(d_debug_logger, boost::format("constraining output items to %d") - // % block->max_output_buffer(port)); - nitems = std::min((long)nitems, (long)grblock->max_output_buffer(port)); - nitems -= nitems % grblock->output_multiple(); - if (nitems < 1) - throw std::runtime_error("problems allocating a buffer with the given max " - "output buffer constraint!"); - } else if (grblock->min_output_buffer(port) > 0) { - nitems = std::max((long)nitems, (long)grblock->min_output_buffer(port)); - nitems -= nitems % grblock->output_multiple(); - if (nitems < 1) - throw std::runtime_error("problems allocating a buffer with the given min " - "output buffer constraint!"); - } + // Sanity check, make sure lcm_nitems is at least 1 + if (lcm_nitems < 1) { + lcm_nitems = 1; + } - for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { - block_sptr dgrblock = cast_to_block_sptr(*p); - if (!dgrblock) - throw std::runtime_error("allocate_buffer found non-gr::block"); - - double decimation = (1.0 / dgrblock->relative_rate()); - int multiple = dgrblock->output_multiple(); - int history = dgrblock->history(); - nitems = - std::max(nitems, static_cast<int>(2 * (decimation * multiple + history))); - } +#ifdef BUFFER_DEBUG + msg.str(""); + msg << " NINPUT_ITEMS: " << nitems; + GR_LOG_DEBUG(d_logger, msg.str()); - // std::cout << "make_buffer(" << nitems << ", " << item_size << ", " << grblock << - // "\n"; - // We're going to let this fail once and retry. If that fails, - // throw and exit. - buffer_sptr b; - try { - b = make_buffer(nitems, item_size, grblock); - } catch (std::bad_alloc&) { - b = make_buffer(nitems, item_size, grblock); - } + msg.str(""); + msg << " LCM NITEMS: " << lcm_nitems; + GR_LOG_DEBUG(d_logger, msg.str()); - // Set the max noutput items size here to make sure it's always - // set in the block and available in the start() method. - // But don't overwrite if the user has set this externally. - if (!grblock->is_set_max_noutput_items()) - grblock->set_max_noutput_items(nitems); + msg.str(""); + msg << " HISTORY: " << dgrblock->history(); + GR_LOG_DEBUG(d_logger, msg.str()); - return b; + msg.str(""); + msg << " DELAY: " << dgrblock->sample_delay(0); + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + } + downstream_max_nitems[i] = nitems; + downstream_lcm_nitems[i] = lcm_nitems; + } + + // Allocate the block detail and necessary buffers + grblock->allocate_detail( + ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems); } void flat_flowgraph::connect_block_inputs(basic_block_sptr block) @@ -194,8 +188,40 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block) block_sptr src_grblock = cast_to_block_sptr(src_block); if (!src_grblock) throw std::runtime_error("connect_block_inputs found non-gr::block"); - buffer_sptr src_buffer = src_grblock->detail()->output(src_port); + buffer_sptr src_buffer; + buffer_type_t src_buf_type = src_grblock->get_buffer_type(); + buffer_type_t dest_buf_type = grblock->get_buffer_type(); + if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() || + dest_buf_type == src_buf_type) { + // The block is not using a custom buffer OR the block and the upstream + // block both use the same kind of custom buffer + src_buffer = src_grblock->detail()->output(src_port); + } else { + if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() && + src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) { + // The block uses a custom buffer but the upstream block does not + // therefore the upstream block's buffer can be replaced with the + // type of buffer that the block needs + std::ostringstream msg; + msg << "Block: " << grblock->identifier() + << "replacing upstream block: " << src_grblock->identifier() + << " buffer with a custom buffer"; + GR_LOG_DEBUG(d_debug_logger, msg.str()); + src_buffer = src_grblock->replace_buffer(src_port, grblock); + } else { + // Both the block and upstream block use incompatible buffer types + // which is not currently allowed + std::ostringstream msg; + msg << "Block: " << grblock->identifier() + << " and upstream block: " << src_grblock->identifier() + << " use incompatible custom buffer types (" << dest_buf_type.name() + << " -- " << src_buf_type.name() << ") --> " + << (dest_buf_type == src_buf_type); + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + } GR_LOG_DEBUG(d_debug_logger, "Setting input " + std::to_string(dst_port) + " from edge " + @@ -220,7 +246,7 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg) if (!block->detail()) { GR_LOG_DEBUG(d_debug_logger, "merge: allocating new detail for block " + block->identifier()); - block->set_detail(allocate_block_detail(block)); + allocate_block_detail(block); } else { GR_LOG_DEBUG(d_debug_logger, "merge: reusing original detail for block " + diff --git a/gnuradio-runtime/lib/flat_flowgraph.h b/gnuradio-runtime/lib/flat_flowgraph.h index deb0fe2c6a..71645d6a25 100644 --- a/gnuradio-runtime/lib/flat_flowgraph.h +++ b/gnuradio-runtime/lib/flat_flowgraph.h @@ -78,8 +78,7 @@ public: private: flat_flowgraph(); - block_detail_sptr allocate_block_detail(basic_block_sptr block); - buffer_sptr allocate_buffer(basic_block_sptr block, int port); + void allocate_block_detail(basic_block_sptr block); void connect_block_inputs(basic_block_sptr block); /* When reusing a flowgraph's blocks, this call makes sure all of diff --git a/gnuradio-runtime/lib/qa_buffer.cc b/gnuradio-runtime/lib/qa_buffer.cc index c0e9c0d130..cefe548338 100644 --- a/gnuradio-runtime/lib/qa_buffer.cc +++ b/gnuradio-runtime/lib/qa_buffer.cc @@ -14,6 +14,8 @@ #include <gnuradio/buffer.h> +#include <gnuradio/buffer_double_mapped.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/random.h> #include <boost/test/unit_test.hpp> #include <cstdlib> @@ -40,7 +42,8 @@ static void t0_body() int nitems = 4000 / sizeof(int); int counter = 0; - gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr())); + gr::buffer_sptr buf( + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); int last_sa; int sa; @@ -74,7 +77,8 @@ static void t1_body() int write_counter = 0; int read_counter = 0; - gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr())); + gr::buffer_sptr buf( + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr())); int sa; @@ -145,7 +149,8 @@ static void t2_body() int nitems = (64 * (1L << 10)) / sizeof(int); // 64K worth of ints - gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr())); + gr::buffer_sptr buf( + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr())); int read_counter = 0; @@ -210,7 +215,8 @@ static void t3_body() int nitems = (64 * (1L << 10)) / sizeof(int); static const int N = 5; - gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr())); + gr::buffer_sptr buf( + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); gr::buffer_reader_sptr reader[N]; int read_counter[N]; int write_counter = 0; diff --git a/gnuradio-runtime/lib/tpb_detail.cc b/gnuradio-runtime/lib/tpb_detail.cc index 312a5bf5e2..badc4cf9ef 100644 --- a/gnuradio-runtime/lib/tpb_detail.cc +++ b/gnuradio-runtime/lib/tpb_detail.cc @@ -15,6 +15,7 @@ #include <gnuradio/block.h> #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/tpb_detail.h> namespace gr { |