diff options
Diffstat (limited to 'gnuradio-runtime/lib/buffer_single_mapped.cc')
-rw-r--r-- | gnuradio-runtime/lib/buffer_single_mapped.cc | 337 |
1 files changed, 235 insertions, 102 deletions
diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc index 6024396557..db7959c5b4 100644 --- a/gnuradio-runtime/lib/buffer_single_mapped.cc +++ b/gnuradio-runtime/lib/buffer_single_mapped.cc @@ -19,6 +19,8 @@ #include <gnuradio/thread/thread.h> #include <assert.h> #include <algorithm> +#include <cstdlib> +#include <cstring> #include <iostream> #include <stdexcept> @@ -27,30 +29,18 @@ namespace gr { buffer_single_mapped::buffer_single_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link, block_sptr buf_owner) - : buffer(BufferMappingType::SingleMapped, + : buffer(buffer_mapping_type::single_mapped, nitems, sizeof_item, downstream_lcm_nitems, + downstream_max_out_mult, link), d_buf_owner(buf_owner), - d_buffer(nullptr, - std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1)) + d_buffer(nullptr) { - gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped"); - if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems)) - throw std::bad_alloc(); - -#ifdef BUFFER_DEBUG - // BUFFER DEBUG - { - std::ostringstream msg; - msg << "[" << this << "] " - << "buffer_single_mapped constructor -- history: " << link->history(); - GR_LOG_DEBUG(d_logger, msg.str()); - } -#endif } buffer_single_mapped::~buffer_single_mapped() {} @@ -59,14 +49,21 @@ buffer_single_mapped::~buffer_single_mapped() {} * Allocates underlying buffer. * returns true iff successful. */ -bool buffer_single_mapped::allocate_buffer(int nitems, - size_t sizeof_item, - uint64_t downstream_lcm_nitems) +bool buffer_single_mapped::allocate_buffer(int nitems) { #ifdef BUFFER_DEBUG int orig_nitems = nitems; #endif + // For single mapped buffers resize the initial size to be at least four + // times the size of the largest of any downstream block's output multiple. + // This helps reduce the number of times the input_block_callback() might be + // called which should help overall performance (particularly if the max + // output multiple is large) at the cost of slightly more buffer space. + if (static_cast<uint32_t>(nitems) < (4 * d_max_reader_output_multiple)) { + nitems = 4 * d_max_reader_output_multiple; + } + // Unlike the double mapped buffer case that can easily wrap back onto itself // for both reads and writes the single mapped case needs to be aware of read // and write granularity and size the underlying buffer accordingly. Otherwise @@ -105,33 +102,37 @@ bool buffer_single_mapped::allocate_buffer(int nitems, #endif // Adjust size so output buffer size is a multiple of the write granularity - if (write_granularity != 1 || downstream_lcm_nitems != 1) { - uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems); + if (write_granularity != 1 || d_downstream_lcm_nitems != 1) { + uint64_t size_align_adjust = GR_LCM(write_granularity, d_downstream_lcm_nitems); uint64_t remainder = nitems % size_align_adjust; nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0; #ifdef BUFFER_DEBUG std::ostringstream msg; msg << "allocate_buffer()** called nitems: " << orig_nitems - << " -- read_multiple: " << downstream_lcm_nitems + << " -- read_multiple: " << d_downstream_lcm_nitems << " -- write_multiple: " << write_granularity << " -- NEW nitems: " << nitems; GR_LOG_DEBUG(d_logger, msg.str()); #endif } - // Allocate a new custom buffer from the owning block - char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item); - assert(buf != nullptr); - d_buffer.reset(buf); - - d_base = d_buffer.get(); d_bufsize = nitems; - d_downstream_lcm_nitems = downstream_lcm_nitems; + d_downstream_lcm_nitems = d_downstream_lcm_nitems; d_write_multiple = write_granularity; - return true; + // Do the actual allocation(s) with the finalized nitems + return do_allocate_buffer(nitems, d_sizeof_item); +} + +bool buffer_single_mapped::input_blkd_cb_ready(int items_required, + unsigned int read_index) +{ + gr::thread::scoped_lock(*this->mutex()); + + return (((d_bufsize - read_index) < (uint32_t)items_required) && + (d_write_index < read_index)); } bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple) @@ -145,70 +146,6 @@ bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple) ((space_avail / output_multiple) * output_multiple == 0)); } - -bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force) -{ - uint32_t space_avail = space_available(); - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << this << "] " - << "output_blocked_callback()*** WR_idx: " << d_write_index - << " -- WR items: " << nitems_written() - << " -- output_multiple: " << output_multiple - << " -- space_avail: " << space_avail << " -- force: " << force; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) || - force) { - // Find reader with the smallest read index - uint32_t min_read_idx = d_readers[0]->d_read_index; - for (size_t idx = 1; idx < d_readers.size(); ++idx) { - // Record index of reader with minimum read-index - if (d_readers[idx]->d_read_index < min_read_idx) { - min_read_idx = d_readers[idx]->d_read_index; - } - } - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << this << "] " - << "output_blocked_callback() WR_idx: " << d_write_index - << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx - << " -- shortcircuit: " - << ((min_read_idx == 0) || (min_read_idx >= d_write_index)) - << " -- to_move_items: " << (d_write_index - min_read_idx) - << " -- space_avail: " << space_avail << " -- force: " << force; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - // Make sure we have enough room to start writing back at the beginning - if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) { - return false; - } - - // Determine how much "to be read" data needs to be moved - int to_move_items = d_write_index - min_read_idx; - assert(to_move_items > 0); - uint32_t to_move_bytes = to_move_items * d_sizeof_item; - - // Shift "to be read" data back to the beginning of the buffer - std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes); - - // Adjust write index and each reader index - d_write_index -= min_read_idx; - - for (size_t idx = 0; idx < d_readers.size(); ++idx) { - d_readers[idx]->d_read_index -= min_read_idx; - } - - return true; - } - - return false; -} - int buffer_single_mapped::space_available() { if (d_readers.empty()) @@ -233,7 +170,7 @@ int buffer_single_mapped::space_available() // For single mapped buffer there is no wrapping beyond the end of the // buffer #ifdef BUFFER_DEBUG - int thecase = 4; // REMOVE ME - just for debug + int thecase = 0; #endif int space = d_bufsize - d_write_index; @@ -259,15 +196,17 @@ int buffer_single_mapped::space_available() #endif space = min_read_index - d_write_index; // Leave extra space in case the reader gets stuck and needs realignment - { - if ((d_write_index > (d_bufsize / 2)) || - (min_read_index < (d_bufsize / 2))) { + if (d_max_reader_output_multiple > 1) { + if (static_cast<uint32_t>(space) > d_max_reader_output_multiple) { #ifdef BUFFER_DEBUG - thecase = 17; + thecase = 4; #endif - space = 0; + space = space - d_max_reader_output_multiple; } else { - space = (d_bufsize / 2) - d_write_index; +#ifdef BUFFER_DEBUG + thecase = 5; +#endif + space = 0; } } } @@ -278,7 +217,6 @@ int buffer_single_mapped::space_available() } #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << this << "] " << "space_available() called (case: " << thecase @@ -286,6 +224,7 @@ int buffer_single_mapped::space_available() << " -- min_read_index: " << min_read_index << " (" << min_idx_reader->nitems_read() << ") " << " -- space: " << space + << " -- max_reader_out_mult: " << d_max_reader_output_multiple << " (sample delay: " << min_idx_reader->sample_delay() << ")"; GR_LOG_DEBUG(d_logger, msg.str()); #endif @@ -294,4 +233,198 @@ int buffer_single_mapped::space_available() } } +void buffer_single_mapped::update_reader_block_history(unsigned history, int delay) +{ + unsigned old_max = d_max_reader_history; + d_max_reader_history = std::max(d_max_reader_history, history); + if (d_max_reader_history != old_max) { + d_write_index = d_max_reader_history - 1; + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "buffer_single_mapped constructor -- set wr index to: " << d_write_index; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Reset the reader's read index if the buffer's write index has changed. + // Note that "history - 1" is the nzero_preload value passed to + // buffer_add_reader. + for (auto reader : d_readers) { + reader->d_read_index = d_write_index - (reader->link()->history() - 1); + } + } + + // Only attempt to set has history flag if it is not already set + if (!d_has_history) { + // Blocks that set delay may set history to delay + 1 but this is + // not "real" history + d_has_history = ((static_cast<int>(history) - 1) != delay); + } +} + +//------------------------------------------------------------------------------ + +bool buffer_single_mapped::input_blocked_callback_logic(int items_required, + int items_avail, + unsigned read_index, + char* buffer_ptr, + memcpy_func_t memcpy_func, + memmove_func_t memmove_func) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "input_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize + << " -- RD_idx: " << read_index << " -- items_required: " << items_required + << " -- items_avail: " << items_avail; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Maybe adjust read pointers from min read index? + // This would mean that *all* readers must be > (passed) the write index + if (((d_bufsize - read_index) < (uint32_t)items_required) && + (d_write_index < read_index)) { + + // Find reader with the smallest read index that is greater than the + // write index + uint32_t min_reader_index = std::numeric_limits<uint32_t>::max(); + uint32_t min_read_idx = std::numeric_limits<uint32_t>::max(); + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (d_readers[idx]->d_read_index > d_write_index) { + // Record index of reader with minimum read-index + if (d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_readers[idx]->d_read_index; + min_reader_index = idx; + } + } + } + + // Note items_avail might be zero, that's okay. + items_avail += read_index - min_read_idx; + int gap = min_read_idx - d_write_index; + if (items_avail > gap) { + return false; + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "input_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize + << " -- RD_idx: " << min_read_idx; + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (idx != min_reader_index) { + msg << " -- OTHER_RDR: " << d_readers[idx]->d_read_index; + } + } + + msg << " -- GAP: " << gap << " -- items_required: " << items_required + << " -- items_avail: " << items_avail; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Shift existing data down to make room for blocked data at end of buffer + uint32_t move_data_size = d_write_index * d_sizeof_item; + char* dest = buffer_ptr + (items_avail * d_sizeof_item); + memmove_func(dest, buffer_ptr, move_data_size); + + // Next copy the data from the end of the buffer back to the beginning + uint32_t avail_data_size = items_avail * d_sizeof_item; + char* src = buffer_ptr + (min_read_idx * d_sizeof_item); + memcpy_func(buffer_ptr, src, avail_data_size); + + // Now adjust write pointer + d_write_index += items_avail; + + // Finally adjust all reader pointers + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (idx == min_reader_index) { + d_readers[idx]->d_read_index = 0; + } else { + d_readers[idx]->d_read_index += items_avail; + d_readers[idx]->d_read_index %= d_bufsize; + } + } + + return true; + } + + return false; +} + +bool buffer_single_mapped::output_blocked_callback_logic(int output_multiple, + bool force, + char* buffer_ptr, + memmove_func_t memmove_func) +{ + uint32_t space_avail = space_available(); + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback()*** WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() + << " -- output_multiple: " << output_multiple + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) || + force) { + // Find reader with the smallest read index + uint32_t min_read_idx = d_readers[0]->d_read_index; + uint64_t min_read_idx_nitems = d_readers[0]->nitems_read(); + for (size_t idx = 1; idx < d_readers.size(); ++idx) { + // Record index of reader with minimum read-index + if (d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_readers[idx]->d_read_index; + min_read_idx_nitems = d_readers[idx]->nitems_read(); + } + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx + << " -- RD items: " << min_read_idx_nitems << " -- shortcircuit: " + << ((min_read_idx == 0) || (min_read_idx > d_write_index) || + (min_read_idx == d_write_index && + min_read_idx_nitems != nitems_written())) + << " -- to_move_items: " << (d_write_index - min_read_idx) + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Make sure we have enough room to start writing back at the beginning + if ((min_read_idx == 0) || (min_read_idx > d_write_index) || + (min_read_idx == d_write_index && min_read_idx_nitems != nitems_written())) { + return false; + } + + // Determine how much "to be read" data needs to be moved + int to_move_items = d_write_index - min_read_idx; + if (to_move_items > 0) { + uint32_t to_move_bytes = to_move_items * d_sizeof_item; + + // Shift "to be read" data back to the beginning of the buffer + memmove_func( + buffer_ptr, buffer_ptr + (min_read_idx * d_sizeof_item), to_move_bytes); + } + + // Adjust write index and each reader index + d_write_index -= min_read_idx; + + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + d_readers[idx]->d_read_index -= min_read_idx; + } + + return true; + } + + return false; +} + } /* namespace gr */ |