summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/buffer_single_mapped.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-runtime/lib/buffer_single_mapped.cc')
-rw-r--r--gnuradio-runtime/lib/buffer_single_mapped.cc337
1 files changed, 235 insertions, 102 deletions
diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc
index 6024396557..db7959c5b4 100644
--- a/gnuradio-runtime/lib/buffer_single_mapped.cc
+++ b/gnuradio-runtime/lib/buffer_single_mapped.cc
@@ -19,6 +19,8 @@
#include <gnuradio/thread/thread.h>
#include <assert.h>
#include <algorithm>
+#include <cstdlib>
+#include <cstring>
#include <iostream>
#include <stdexcept>
@@ -27,30 +29,18 @@ namespace gr {
buffer_single_mapped::buffer_single_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link,
block_sptr buf_owner)
- : buffer(BufferMappingType::SingleMapped,
+ : buffer(buffer_mapping_type::single_mapped,
nitems,
sizeof_item,
downstream_lcm_nitems,
+ downstream_max_out_mult,
link),
d_buf_owner(buf_owner),
- d_buffer(nullptr,
- std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1))
+ d_buffer(nullptr)
{
- gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped");
- if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems))
- throw std::bad_alloc();
-
-#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
- {
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "buffer_single_mapped constructor -- history: " << link->history();
- GR_LOG_DEBUG(d_logger, msg.str());
- }
-#endif
}
buffer_single_mapped::~buffer_single_mapped() {}
@@ -59,14 +49,21 @@ buffer_single_mapped::~buffer_single_mapped() {}
* Allocates underlying buffer.
* returns true iff successful.
*/
-bool buffer_single_mapped::allocate_buffer(int nitems,
- size_t sizeof_item,
- uint64_t downstream_lcm_nitems)
+bool buffer_single_mapped::allocate_buffer(int nitems)
{
#ifdef BUFFER_DEBUG
int orig_nitems = nitems;
#endif
+ // For single mapped buffers resize the initial size to be at least four
+ // times the size of the largest of any downstream block's output multiple.
+ // This helps reduce the number of times the input_block_callback() might be
+ // called which should help overall performance (particularly if the max
+ // output multiple is large) at the cost of slightly more buffer space.
+ if (static_cast<uint32_t>(nitems) < (4 * d_max_reader_output_multiple)) {
+ nitems = 4 * d_max_reader_output_multiple;
+ }
+
// Unlike the double mapped buffer case that can easily wrap back onto itself
// for both reads and writes the single mapped case needs to be aware of read
// and write granularity and size the underlying buffer accordingly. Otherwise
@@ -105,33 +102,37 @@ bool buffer_single_mapped::allocate_buffer(int nitems,
#endif
// Adjust size so output buffer size is a multiple of the write granularity
- if (write_granularity != 1 || downstream_lcm_nitems != 1) {
- uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems);
+ if (write_granularity != 1 || d_downstream_lcm_nitems != 1) {
+ uint64_t size_align_adjust = GR_LCM(write_granularity, d_downstream_lcm_nitems);
uint64_t remainder = nitems % size_align_adjust;
nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0;
#ifdef BUFFER_DEBUG
std::ostringstream msg;
msg << "allocate_buffer()** called nitems: " << orig_nitems
- << " -- read_multiple: " << downstream_lcm_nitems
+ << " -- read_multiple: " << d_downstream_lcm_nitems
<< " -- write_multiple: " << write_granularity
<< " -- NEW nitems: " << nitems;
GR_LOG_DEBUG(d_logger, msg.str());
#endif
}
- // Allocate a new custom buffer from the owning block
- char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item);
- assert(buf != nullptr);
- d_buffer.reset(buf);
-
- d_base = d_buffer.get();
d_bufsize = nitems;
- d_downstream_lcm_nitems = downstream_lcm_nitems;
+ d_downstream_lcm_nitems = d_downstream_lcm_nitems;
d_write_multiple = write_granularity;
- return true;
+ // Do the actual allocation(s) with the finalized nitems
+ return do_allocate_buffer(nitems, d_sizeof_item);
+}
+
+bool buffer_single_mapped::input_blkd_cb_ready(int items_required,
+ unsigned int read_index)
+{
+ gr::thread::scoped_lock(*this->mutex());
+
+ return (((d_bufsize - read_index) < (uint32_t)items_required) &&
+ (d_write_index < read_index));
}
bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
@@ -145,70 +146,6 @@ bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
((space_avail / output_multiple) * output_multiple == 0));
}
-
-bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force)
-{
- uint32_t space_avail = space_available();
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "output_blocked_callback()*** WR_idx: " << d_write_index
- << " -- WR items: " << nitems_written()
- << " -- output_multiple: " << output_multiple
- << " -- space_avail: " << space_avail << " -- force: " << force;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
- force) {
- // Find reader with the smallest read index
- uint32_t min_read_idx = d_readers[0]->d_read_index;
- for (size_t idx = 1; idx < d_readers.size(); ++idx) {
- // Record index of reader with minimum read-index
- if (d_readers[idx]->d_read_index < min_read_idx) {
- min_read_idx = d_readers[idx]->d_read_index;
- }
- }
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "output_blocked_callback() WR_idx: " << d_write_index
- << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
- << " -- shortcircuit: "
- << ((min_read_idx == 0) || (min_read_idx >= d_write_index))
- << " -- to_move_items: " << (d_write_index - min_read_idx)
- << " -- space_avail: " << space_avail << " -- force: " << force;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- // Make sure we have enough room to start writing back at the beginning
- if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) {
- return false;
- }
-
- // Determine how much "to be read" data needs to be moved
- int to_move_items = d_write_index - min_read_idx;
- assert(to_move_items > 0);
- uint32_t to_move_bytes = to_move_items * d_sizeof_item;
-
- // Shift "to be read" data back to the beginning of the buffer
- std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes);
-
- // Adjust write index and each reader index
- d_write_index -= min_read_idx;
-
- for (size_t idx = 0; idx < d_readers.size(); ++idx) {
- d_readers[idx]->d_read_index -= min_read_idx;
- }
-
- return true;
- }
-
- return false;
-}
-
int buffer_single_mapped::space_available()
{
if (d_readers.empty())
@@ -233,7 +170,7 @@ int buffer_single_mapped::space_available()
// For single mapped buffer there is no wrapping beyond the end of the
// buffer
#ifdef BUFFER_DEBUG
- int thecase = 4; // REMOVE ME - just for debug
+ int thecase = 0;
#endif
int space = d_bufsize - d_write_index;
@@ -259,15 +196,17 @@ int buffer_single_mapped::space_available()
#endif
space = min_read_index - d_write_index;
// Leave extra space in case the reader gets stuck and needs realignment
- {
- if ((d_write_index > (d_bufsize / 2)) ||
- (min_read_index < (d_bufsize / 2))) {
+ if (d_max_reader_output_multiple > 1) {
+ if (static_cast<uint32_t>(space) > d_max_reader_output_multiple) {
#ifdef BUFFER_DEBUG
- thecase = 17;
+ thecase = 4;
#endif
- space = 0;
+ space = space - d_max_reader_output_multiple;
} else {
- space = (d_bufsize / 2) - d_write_index;
+#ifdef BUFFER_DEBUG
+ thecase = 5;
+#endif
+ space = 0;
}
}
}
@@ -278,7 +217,6 @@ int buffer_single_mapped::space_available()
}
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << this << "] "
<< "space_available() called (case: " << thecase
@@ -286,6 +224,7 @@ int buffer_single_mapped::space_available()
<< " -- min_read_index: " << min_read_index << " ("
<< min_idx_reader->nitems_read() << ") "
<< " -- space: " << space
+ << " -- max_reader_out_mult: " << d_max_reader_output_multiple
<< " (sample delay: " << min_idx_reader->sample_delay() << ")";
GR_LOG_DEBUG(d_logger, msg.str());
#endif
@@ -294,4 +233,198 @@ int buffer_single_mapped::space_available()
}
}
+void buffer_single_mapped::update_reader_block_history(unsigned history, int delay)
+{
+ unsigned old_max = d_max_reader_history;
+ d_max_reader_history = std::max(d_max_reader_history, history);
+ if (d_max_reader_history != old_max) {
+ d_write_index = d_max_reader_history - 1;
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "buffer_single_mapped constructor -- set wr index to: " << d_write_index;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Reset the reader's read index if the buffer's write index has changed.
+ // Note that "history - 1" is the nzero_preload value passed to
+ // buffer_add_reader.
+ for (auto reader : d_readers) {
+ reader->d_read_index = d_write_index - (reader->link()->history() - 1);
+ }
+ }
+
+ // Only attempt to set has history flag if it is not already set
+ if (!d_has_history) {
+ // Blocks that set delay may set history to delay + 1 but this is
+ // not "real" history
+ d_has_history = ((static_cast<int>(history) - 1) != delay);
+ }
+}
+
+//------------------------------------------------------------------------------
+
+bool buffer_single_mapped::input_blocked_callback_logic(int items_required,
+ int items_avail,
+ unsigned read_index,
+ char* buffer_ptr,
+ memcpy_func_t memcpy_func,
+ memmove_func_t memmove_func)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize
+ << " -- RD_idx: " << read_index << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Maybe adjust read pointers from min read index?
+ // This would mean that *all* readers must be > (passed) the write index
+ if (((d_bufsize - read_index) < (uint32_t)items_required) &&
+ (d_write_index < read_index)) {
+
+ // Find reader with the smallest read index that is greater than the
+ // write index
+ uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
+ uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (d_readers[idx]->d_read_index > d_write_index) {
+ // Record index of reader with minimum read-index
+ if (d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_readers[idx]->d_read_index;
+ min_reader_index = idx;
+ }
+ }
+ }
+
+ // Note items_avail might be zero, that's okay.
+ items_avail += read_index - min_read_idx;
+ int gap = min_read_idx - d_write_index;
+ if (items_avail > gap) {
+ return false;
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize
+ << " -- RD_idx: " << min_read_idx;
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (idx != min_reader_index) {
+ msg << " -- OTHER_RDR: " << d_readers[idx]->d_read_index;
+ }
+ }
+
+ msg << " -- GAP: " << gap << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Shift existing data down to make room for blocked data at end of buffer
+ uint32_t move_data_size = d_write_index * d_sizeof_item;
+ char* dest = buffer_ptr + (items_avail * d_sizeof_item);
+ memmove_func(dest, buffer_ptr, move_data_size);
+
+ // Next copy the data from the end of the buffer back to the beginning
+ uint32_t avail_data_size = items_avail * d_sizeof_item;
+ char* src = buffer_ptr + (min_read_idx * d_sizeof_item);
+ memcpy_func(buffer_ptr, src, avail_data_size);
+
+ // Now adjust write pointer
+ d_write_index += items_avail;
+
+ // Finally adjust all reader pointers
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (idx == min_reader_index) {
+ d_readers[idx]->d_read_index = 0;
+ } else {
+ d_readers[idx]->d_read_index += items_avail;
+ d_readers[idx]->d_read_index %= d_bufsize;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+bool buffer_single_mapped::output_blocked_callback_logic(int output_multiple,
+ bool force,
+ char* buffer_ptr,
+ memmove_func_t memmove_func)
+{
+ uint32_t space_avail = space_available();
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "output_blocked_callback()*** WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written()
+ << " -- output_multiple: " << output_multiple
+ << " -- space_avail: " << space_avail << " -- force: " << force;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
+ force) {
+ // Find reader with the smallest read index
+ uint32_t min_read_idx = d_readers[0]->d_read_index;
+ uint64_t min_read_idx_nitems = d_readers[0]->nitems_read();
+ for (size_t idx = 1; idx < d_readers.size(); ++idx) {
+ // Record index of reader with minimum read-index
+ if (d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_readers[idx]->d_read_index;
+ min_read_idx_nitems = d_readers[idx]->nitems_read();
+ }
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "output_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
+ << " -- RD items: " << min_read_idx_nitems << " -- shortcircuit: "
+ << ((min_read_idx == 0) || (min_read_idx > d_write_index) ||
+ (min_read_idx == d_write_index &&
+ min_read_idx_nitems != nitems_written()))
+ << " -- to_move_items: " << (d_write_index - min_read_idx)
+ << " -- space_avail: " << space_avail << " -- force: " << force;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Make sure we have enough room to start writing back at the beginning
+ if ((min_read_idx == 0) || (min_read_idx > d_write_index) ||
+ (min_read_idx == d_write_index && min_read_idx_nitems != nitems_written())) {
+ return false;
+ }
+
+ // Determine how much "to be read" data needs to be moved
+ int to_move_items = d_write_index - min_read_idx;
+ if (to_move_items > 0) {
+ uint32_t to_move_bytes = to_move_items * d_sizeof_item;
+
+ // Shift "to be read" data back to the beginning of the buffer
+ memmove_func(
+ buffer_ptr, buffer_ptr + (min_read_idx * d_sizeof_item), to_move_bytes);
+ }
+
+ // Adjust write index and each reader index
+ d_write_index -= min_read_idx;
+
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ d_readers[idx]->d_read_index -= min_read_idx;
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
} /* namespace gr */