From 788827ae116bef871e144abd39b1e4482208eabe Mon Sep 17 00:00:00 2001
From: David Sorber <david.sorber@blacklynx.tech>
Date: Wed, 12 May 2021 08:59:21 -0400
Subject: runtime: Custom Buffer/Accelerator Device Support - Milestone 1

Custom Buffer/Accelerator Device Support - Milestone 1 changes:

    * Refactored existing single mapped buffer code and created single
      mapped buffer abstraction; wrapping within single mapped buffers
      is handled explicitly by input blocked and output blocked
      callbacks that are called from block_executor
    * Added simple custom buffer allocation interface (NOTE: this
      interface will change for milestone 2)
    * Accelerated blocks are still responsible for data transfer but the
      custom buffer interface eliminates the double copy problem

Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
---
 gnuradio-runtime/lib/buffer.cc | 273 ++++++++++++++---------------------------
 1 file changed, 94 insertions(+), 179 deletions(-)

(limited to 'gnuradio-runtime/lib/buffer.cc')

diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc
index cb6b949932..7fd0a39579 100644
--- a/gnuradio-runtime/lib/buffer.cc
+++ b/gnuradio-runtime/lib/buffer.cc
@@ -13,6 +13,10 @@
 #endif
 #include "vmcircbuf.h"
 #include <gnuradio/buffer.h>
+#include <gnuradio/buffer_double_mapped.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/buffer_single_mapped.h>
+#include <gnuradio/buffer_type.h>
 #include <gnuradio/integer_math.h>
 #include <gnuradio/math.h>
 #include <boost/format.hpp>
@@ -23,7 +27,6 @@
 namespace gr {
 
 static long s_buffer_count = 0; // counts for debugging storage mgmt
-static long s_buffer_reader_count = 0;
 
 /* ----------------------------------------------------------------------------
                       Notes on storage management
@@ -52,117 +55,81 @@ static long s_buffer_reader_count = 0;
  gr::buffer_reader goes to zero, we can successfully reclaim it.
  ---------------------------------------------------------------------------- */
 
-/*
- * Compute the minimum number of buffer items that work (i.e.,
- * address space wrap-around works).  To work is to satisfy this
- * constraint for integer buffer_size and k:
- *
- *     type_size * nitems == k * page_size
- */
-static inline long minimum_buffer_items(long type_size, long page_size)
-{
-    return page_size / GR_GCD(type_size, page_size);
-}
-
 
-buffer::buffer(int nitems, size_t sizeof_item, block_sptr link)
+buffer::buffer(BufferMappingType buf_type,
+               int nitems,
+               size_t sizeof_item,
+               uint64_t downstream_lcm_nitems,
+               block_sptr link)
     : d_base(0),
       d_bufsize(0),
+      d_buf_map_type(buf_type),
       d_max_reader_delay(0),
+      d_max_reader_history(1),
+      d_has_history(false),
       d_sizeof_item(sizeof_item),
       d_link(link),
       d_write_index(0),
       d_abs_write_offset(0),
       d_done(false),
-      d_last_min_items_read(0)
+      d_last_min_items_read(0),
+      d_callback_flag(false),
+      d_active_pointer_counter(0),
+      d_downstream_lcm_nitems(downstream_lcm_nitems),
+      d_write_multiple(0)
 {
     gr::configure_default_loggers(d_logger, d_debug_logger, "buffer");
-    if (!allocate_buffer(nitems, sizeof_item))
-        throw std::bad_alloc();
 
     s_buffer_count++;
 }
 
-buffer_sptr make_buffer(int nitems, size_t sizeof_item, block_sptr link)
+buffer_sptr make_buffer(int nitems,
+                        size_t sizeof_item,
+                        uint64_t downstream_lcm_nitems,
+                        block_sptr link,
+                        block_sptr buf_owner)
 {
-    return buffer_sptr(new buffer(nitems, sizeof_item, link));
-}
+#ifdef BUFFER_DEBUG
+    // BUFFER DEBUG
+    gr::logger_ptr logger;
+    gr::logger_ptr debug_logger;
+    gr::configure_default_loggers(logger, debug_logger, "make_buffer");
+    std::ostringstream msg;
+#endif
 
-buffer::~buffer()
-{
-    assert(d_readers.size() == 0);
-    s_buffer_count--;
-}
+#if DEBUG_SINGLE_MAPPED
+    if (1) {
+#else
+    if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) {
+#endif
+        // Buffer type is NOT the default non custom variety so allocate a
+        // buffer_single_mapped instance
+#ifdef BUFFER_DEBUG
+        msg << "buffer_single_mapped nitems: " << nitems
+            << " -- sizeof_item: " << sizeof_item;
+        GR_LOG_DEBUG(logger, msg.str());
+#endif
 
-/*!
- * sets d_vmcircbuf, d_base, d_bufsize.
- * returns true iff successful.
- */
-bool buffer::allocate_buffer(int nitems, size_t sizeof_item)
-{
-    int orig_nitems = nitems;
-
-    // Any buffersize we come up with must be a multiple of min_nitems.
-    int granularity = gr::vmcircbuf_sysconfig::granularity();
-    int min_nitems = minimum_buffer_items(sizeof_item, granularity);
-
-    // Round-up nitems to a multiple of min_nitems.
-    if (nitems % min_nitems != 0)
-        nitems = ((nitems / min_nitems) + 1) * min_nitems;
-
-    // If we rounded-up a whole bunch, give the user a heads up.
-    // This only happens if sizeof_item is not a power of two.
-
-    if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) {
-        auto msg =
-            str(boost::format(
-                    "allocate_buffer: tried to allocate"
-                    "   %d items of size %d. Due to alignment requirements"
-                    "   %d were allocated.  If this isn't OK, consider padding"
-                    "   your structure to a power-of-two bytes."
-                    "   On this platform, our allocation granularity is %d bytes.") %
-                orig_nitems % sizeof_item % nitems % granularity);
-        GR_LOG_WARN(d_logger, msg.c_str());
-    }
+        return buffer_sptr(new buffer_single_mapped(
+            nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner));
 
-    d_bufsize = nitems;
-    d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item));
-    if (d_vmcircbuf == 0) {
-        std::ostringstream msg;
-        msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size "
-            << d_bufsize * d_sizeof_item / 1024 << " KB";
-        GR_LOG_ERROR(d_logger, msg.str());
-        return false;
-    }
+    } else {
+        // Default to allocating a buffer_double_mapped instance
+#ifdef BUFFER_DEBUG
+        msg << "buffer_double_mapped nitems: " << nitems
+            << " -- sizeof_item: " << sizeof_item;
+        GR_LOG_DEBUG(logger, msg.str());
+#endif
 
-    d_base = (char*)d_vmcircbuf->pointer_to_first_copy();
-    return true;
+        return buffer_sptr(
+            new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
+    }
 }
 
-int buffer::space_available()
+buffer::~buffer()
 {
-    if (d_readers.empty())
-        return d_bufsize - 1; // See comment below
-
-    else {
-        // Find out the maximum amount of data available to our readers
-
-        int most_data = d_readers[0]->items_available();
-        uint64_t min_items_read = d_readers[0]->nitems_read();
-        for (size_t i = 1; i < d_readers.size(); i++) {
-            most_data = std::max(most_data, d_readers[i]->items_available());
-            min_items_read = std::min(min_items_read, d_readers[i]->nitems_read());
-        }
-
-        if (min_items_read != d_last_min_items_read) {
-            prune_tags(d_last_min_items_read);
-            d_last_min_items_read = min_items_read;
-        }
-
-        // The -1 ensures that the case d_write_index == d_read_index is
-        // unambiguous.  It indicates that there is no data for the reader
-        return d_bufsize - most_data - 1;
-    }
+    assert(d_readers.size() == 0);
+    s_buffer_count--;
 }
 
 void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
@@ -170,8 +137,22 @@ void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
 void buffer::update_write_pointer(int nitems)
 {
     gr::thread::scoped_lock guard(*mutex());
+
+#ifdef BUFFER_DEBUG
+    // BUFFER DEBUG
+    unsigned orig_wr_idx = d_write_index;
+#endif
+
     d_write_index = index_add(d_write_index, nitems);
     d_abs_write_offset += nitems;
+
+#ifdef BUFFER_DEBUG
+    // BUFFER DEBUG
+    std::ostringstream msg;
+    msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx
+        << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index;
+    GR_LOG_DEBUG(d_logger, msg.str());
+#endif
 }
 
 void buffer::set_done(bool done)
@@ -180,20 +161,6 @@ void buffer::set_done(bool done)
     d_done = done;
 }
 
-buffer_reader_sptr
-buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay)
-{
-    if (nzero_preload < 0)
-        throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0");
-
-    buffer_reader_sptr r(
-        new buffer_reader(buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
-    r->declare_sample_delay(delay);
-    buf->d_readers.push_back(r.get());
-
-    return r;
-}
-
 void buffer::drop_reader(buffer_reader* reader)
 {
     std::vector<buffer_reader*>::iterator result =
@@ -263,92 +230,40 @@ void buffer::prune_tags(uint64_t max_time)
     }
 }
 
-long buffer_ncurrently_allocated() { return s_buffer_count; }
-
-// ----------------------------------------------------------------------------
-
-buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link)
-    : d_buffer(buffer),
-      d_read_index(read_index),
-      d_abs_read_offset(0),
-      d_link(link),
-      d_attr_delay(0)
-{
-    s_buffer_reader_count++;
-}
-
-buffer_reader::~buffer_reader()
+void buffer::on_lock(gr::thread::scoped_lock& lock)
 {
-    d_buffer->drop_reader(this);
-    s_buffer_reader_count--;
+    // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object
+
+    // Wait until no other callback is active and no pointers are active for
+    // the buffer, then mark the callback flag active.
+    d_cv.wait(lock, [this]() {
+        return (d_callback_flag == false && d_active_pointer_counter == 0);
+    });
+    d_callback_flag = true;
 }
 
-void buffer_reader::declare_sample_delay(unsigned delay)
+void buffer::on_unlock()
 {
-    d_attr_delay = delay;
-    d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay);
-}
-
-unsigned buffer_reader::sample_delay() const { return d_attr_delay; }
-
-int buffer_reader::items_available() const
-{
-    return d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
-}
+    // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object
 
-const void* buffer_reader::read_pointer()
-{
-    return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item];
+    // Mark the callback flag inactive and notify anyone waiting
+    d_callback_flag = false;
+    d_cv.notify_all();
 }
 
-void buffer_reader::update_read_pointer(int nitems)
-{
-    gr::thread::scoped_lock guard(*mutex());
-    d_read_index = d_buffer->index_add(d_read_index, nitems);
-    d_abs_read_offset += nitems;
-}
+long buffer_ncurrently_allocated() { return s_buffer_count; }
 
-void buffer_reader::get_tags_in_range(std::vector<tag_t>& v,
-                                      uint64_t abs_start,
-                                      uint64_t abs_end,
-                                      long id)
+std::ostream& operator<<(std::ostream& os, const buffer& buf)
 {
-    gr::thread::scoped_lock guard(*mutex());
-
-    uint64_t lower_bound = abs_start - d_attr_delay;
-    // check for underflow and if so saturate at 0
-    if (lower_bound > abs_start)
-        lower_bound = 0;
-    uint64_t upper_bound = abs_end - d_attr_delay;
-    // check for underflow and if so saturate at 0
-    if (upper_bound > abs_end)
-        upper_bound = 0;
-
-    v.clear();
-    std::multimap<uint64_t, tag_t>::iterator itr =
-        d_buffer->get_tags_lower_bound(lower_bound);
-    std::multimap<uint64_t, tag_t>::iterator itr_end =
-        d_buffer->get_tags_upper_bound(upper_bound);
-
-    uint64_t item_time;
-    while (itr != itr_end) {
-        item_time = (*itr).second.offset + d_attr_delay;
-        if ((item_time >= abs_start) && (item_time < abs_end)) {
-            std::vector<long>::iterator id_itr;
-            id_itr = std::find(
-                itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id);
-            // If id is not in the vector of marked blocks
-            if (id_itr == itr->second.marked_deleted.end()) {
-                tag_t t = (*itr).second;
-                t.offset += d_attr_delay;
-                v.push_back(t);
-                v.back().marked_deleted.clear();
-            }
-        }
-        itr++;
+    os << std::endl
+       << "  sz:    " << buf.d_bufsize << std::endl
+       << "  nrdrs: " << buf.d_readers.size() << std::endl;
+    for (auto& rdr : buf.d_readers) {
+        os << "    rd_idx: " << rdr->get_read_index() << std::endl
+           << "    abs_rd_offset: " << rdr->get_abs_read_offset() << std::endl
+           << std::endl;
     }
+    return os;
 }
 
-long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; }
-
 } /* namespace gr */
-- 
cgit v1.2.3