summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/buffer_reader_sm.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-runtime/lib/buffer_reader_sm.cc')
-rw-r--r--gnuradio-runtime/lib/buffer_reader_sm.cc149
1 files changed, 149 insertions, 0 deletions
diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc
new file mode 100644
index 0000000000..9e0fac584f
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_reader_sm.cc
@@ -0,0 +1,149 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <gnuradio/block.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader_sm.h>
+#include <gnuradio/integer_math.h>
+#include <gnuradio/math.h>
+#include <assert.h>
+#include <algorithm>
+#include <iostream>
+#include <limits>
+#include <stdexcept>
+
+namespace gr {
+
+buffer_reader_sm::~buffer_reader_sm() {}
+
+int buffer_reader_sm::items_available()
+{
+ int available = 0;
+
+ if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+ if (d_buffer->d_write_index == d_read_index) {
+ if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+ available = d_buffer->d_bufsize - d_read_index;
+ }
+ } else {
+ available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
+ }
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << d_buffer << ";" << this << "] "
+ << "items_available() WR_idx: " << d_buffer->d_write_index
+ << " -- WR items: " << d_buffer->nitems_written()
+ << " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-"
+ << d_attr_delay << ") -- available: " << available;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ return available;
+}
+
+bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const
+{
+ gr::thread::scoped_lock(*d_buffer->mutex());
+
+ return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
+ (d_buffer->d_write_index < d_read_index));
+}
+
+bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail)
+{
+ // Maybe adjust read pointers from min read index?
+ // This would mean that *all* readers must be > (passed) the write index
+ if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
+ (d_buffer->d_write_index < d_read_index)) {
+
+ // Update items available before going farther as it could be stale
+ items_avail = items_available();
+
+ // Find reader with the smallest read index that is greater than the
+ // write index
+ uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
+ uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) {
+ // Record index of reader with minimum read-index
+ if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_buffer->d_readers[idx]->d_read_index;
+ min_reader_index = idx;
+ }
+ }
+ }
+
+ // Note items_avail might be zero, that's okay.
+ items_avail += d_read_index - min_read_idx;
+ int gap = min_read_idx - d_buffer->d_write_index;
+ if (items_avail > gap) {
+ return false;
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << d_buffer << ";" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index
+ << " -- WR items: " << d_buffer->nitems_written()
+ << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx;
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (idx != min_reader_index) {
+ msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index;
+ }
+ }
+
+ msg << " -- GAP: " << gap << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Shift existing data down to make room for blocked data at end of buffer
+ uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item;
+ char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item);
+ std::memmove(dest, d_buffer->d_base, move_data_size);
+
+ // Next copy the data from the end of the buffer back to the beginning
+ uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item;
+ char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item);
+ std::memcpy(d_buffer->d_base, src, avail_data_size);
+
+ // Now adjust write pointer
+ d_buffer->d_write_index += items_avail;
+
+ // Finally adjust all reader pointers
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (idx == min_reader_index) {
+ d_buffer->d_readers[idx]->d_read_index = 0;
+ } else {
+ d_buffer->d_readers[idx]->d_read_index += items_avail;
+ d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer,
+ unsigned int read_index,
+ block_sptr link)
+ : buffer_reader(buffer, read_index, link)
+{
+}
+
+
+} /* namespace gr */