summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/buffer_reader_sm.cc
diff options
context:
space:
mode:
authorDavid Sorber <david.sorber@blacklynx.tech>2021-05-12 08:59:21 -0400
committermormj <34754695+mormj@users.noreply.github.com>2021-10-25 11:27:01 -0400
commit788827ae116bef871e144abd39b1e4482208eabe (patch)
treedcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/lib/buffer_reader_sm.cc
parentb8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff)
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes: * Refactored existing single mapped buffer code and created single mapped buffer abstraction; wrapping within single mapped buffers is handled explicitly by input blocked and output blocked callbacks that are called from block_executor * Added simple custom buffer allocation interface (NOTE: this interface will change for milestone 2) * Accelerated blocks are still responsible for data transfer but the custom buffer interface eliminates the double copy problem Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib/buffer_reader_sm.cc')
-rw-r--r--gnuradio-runtime/lib/buffer_reader_sm.cc149
1 files changed, 149 insertions, 0 deletions
diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc
new file mode 100644
index 0000000000..9e0fac584f
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_reader_sm.cc
@@ -0,0 +1,149 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <gnuradio/block.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader_sm.h>
+#include <gnuradio/integer_math.h>
+#include <gnuradio/math.h>
+#include <assert.h>
+#include <algorithm>
+#include <iostream>
+#include <limits>
+#include <stdexcept>
+
+namespace gr {
+
+buffer_reader_sm::~buffer_reader_sm() {}
+
+int buffer_reader_sm::items_available()
+{
+ int available = 0;
+
+ if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+ if (d_buffer->d_write_index == d_read_index) {
+ if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+ available = d_buffer->d_bufsize - d_read_index;
+ }
+ } else {
+ available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
+ }
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << d_buffer << ";" << this << "] "
+ << "items_available() WR_idx: " << d_buffer->d_write_index
+ << " -- WR items: " << d_buffer->nitems_written()
+ << " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-"
+ << d_attr_delay << ") -- available: " << available;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ return available;
+}
+
+bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const
+{
+ gr::thread::scoped_lock(*d_buffer->mutex());
+
+ return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
+ (d_buffer->d_write_index < d_read_index));
+}
+
+bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail)
+{
+ // Maybe adjust read pointers from min read index?
+ // This would mean that *all* readers must be > (passed) the write index
+ if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
+ (d_buffer->d_write_index < d_read_index)) {
+
+ // Update items available before going farther as it could be stale
+ items_avail = items_available();
+
+ // Find reader with the smallest read index that is greater than the
+ // write index
+ uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
+ uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) {
+ // Record index of reader with minimum read-index
+ if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_buffer->d_readers[idx]->d_read_index;
+ min_reader_index = idx;
+ }
+ }
+ }
+
+ // Note items_avail might be zero, that's okay.
+ items_avail += d_read_index - min_read_idx;
+ int gap = min_read_idx - d_buffer->d_write_index;
+ if (items_avail > gap) {
+ return false;
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << d_buffer << ";" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index
+ << " -- WR items: " << d_buffer->nitems_written()
+ << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx;
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (idx != min_reader_index) {
+ msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index;
+ }
+ }
+
+ msg << " -- GAP: " << gap << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Shift existing data down to make room for blocked data at end of buffer
+ uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item;
+ char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item);
+ std::memmove(dest, d_buffer->d_base, move_data_size);
+
+ // Next copy the data from the end of the buffer back to the beginning
+ uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item;
+ char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item);
+ std::memcpy(d_buffer->d_base, src, avail_data_size);
+
+ // Now adjust write pointer
+ d_buffer->d_write_index += items_avail;
+
+ // Finally adjust all reader pointers
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (idx == min_reader_index) {
+ d_buffer->d_readers[idx]->d_read_index = 0;
+ } else {
+ d_buffer->d_readers[idx]->d_read_index += items_avail;
+ d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer,
+ unsigned int read_index,
+ block_sptr link)
+ : buffer_reader(buffer, read_index, link)
+{
+}
+
+
+} /* namespace gr */