From f3c558d88bc68d865f823c31e7d9aa78b3feab59 Mon Sep 17 00:00:00 2001
From: David Sorber <david.sorber@blacklynx.tech>
Date: Thu, 29 Jul 2021 11:34:37 -0400
Subject: runtime: Custom Buffer/Accelerator Device Support - Milestone 2

Completion of custom buffer/accelerator device support changes:

    * Improved custom buffer interface by removing awkward memory
      allocation functions from the block class
    * Increased flexibility for creating custom buffers by allowing
      creation of buffer_single_mapped subclasses
    * Fully incorporated data movement abstraction into the custom
      buffer interface and the runtime itself; accelerated blocks are no
      longer directly responsible for their own data movement
    * Zero copy back-to-back accelerated blocks are now supported (data
      no longer needs to be moved back to the host between each block)

Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Signed-off-by: Mike Mason <mike.mason@blacklynx.tech>
---
 gnuradio-runtime/lib/CMakeLists.txt          |   4 +-
 gnuradio-runtime/lib/block.cc                |  63 +++--
 gnuradio-runtime/lib/block_detail.cc         |   2 +
 gnuradio-runtime/lib/block_executor.cc       |  22 +-
 gnuradio-runtime/lib/buffer.cc               |  71 +++---
 gnuradio-runtime/lib/buffer_context.cc       |  32 +++
 gnuradio-runtime/lib/buffer_double_mapped.cc |  28 ++-
 gnuradio-runtime/lib/buffer_reader.cc        |  26 ++-
 gnuradio-runtime/lib/buffer_reader_sm.cc     |  85 +------
 gnuradio-runtime/lib/buffer_single_mapped.cc | 337 +++++++++++++++++++--------
 gnuradio-runtime/lib/buffer_type.cc          |  18 --
 gnuradio-runtime/lib/flat_flowgraph.cc       |  67 ++++--
 gnuradio-runtime/lib/host_buffer.cc          | 255 ++++++++++++++++++++
 gnuradio-runtime/lib/io_signature.cc         |  72 ++++--
 gnuradio-runtime/lib/qa_buffer.cc            |   8 +-
 gnuradio-runtime/lib/qa_host_buffer.cc       | 334 ++++++++++++++++++++++++++
 16 files changed, 1089 insertions(+), 335 deletions(-)
 create mode 100644 gnuradio-runtime/lib/buffer_context.cc
 delete mode 100644 gnuradio-runtime/lib/buffer_type.cc
 create mode 100644 gnuradio-runtime/lib/host_buffer.cc
 create mode 100644 gnuradio-runtime/lib/qa_host_buffer.cc

(limited to 'gnuradio-runtime/lib')

diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt
index ca96549386..6fcd31bac4 100644
--- a/gnuradio-runtime/lib/CMakeLists.txt
+++ b/gnuradio-runtime/lib/CMakeLists.txt
@@ -55,16 +55,17 @@ add_library(gnuradio-runtime
   block_gateway_impl.cc
   block_registry.cc
   buffer.cc
+  buffer_context.cc
   buffer_double_mapped.cc
   buffer_reader.cc
   buffer_reader_sm.cc
   buffer_single_mapped.cc
-  buffer_type.cc
   flat_flowgraph.cc
   flowgraph.cc
   hier_block2.cc
   hier_block2_detail.cc
   high_res_timer.cc
+  host_buffer.cc
   io_signature.cc
   local_sighandler.cc
   logger.cc
@@ -335,6 +336,7 @@ if(ENABLE_TESTING)
     qa_buffer.cc
     qa_io_signature.cc
     qa_logger.cc
+    qa_host_buffer.cc
     qa_vmcircbuf.cc
   )
   list(APPEND GR_TEST_TARGET_DEPS gnuradio-runtime gnuradio-pmt)
diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc
index bb6ce95298..75f9dc6ae4 100644
--- a/gnuradio-runtime/lib/block.cc
+++ b/gnuradio-runtime/lib/block.cc
@@ -384,7 +384,8 @@ void block::set_min_output_buffer(int port, long min_output_buffer)
 void block::allocate_detail(int ninputs,
                             int noutputs,
                             const std::vector<int>& downstream_max_nitems_vec,
-                            const std::vector<uint64_t>& downstream_lcm_nitems_vec)
+                            const std::vector<uint64_t>& downstream_lcm_nitems_vec,
+                            const std::vector<uint32_t>& downstream_max_out_mult_vec)
 {
     block_detail_sptr detail = make_block_detail(ninputs, noutputs);
 
@@ -393,8 +394,10 @@ void block::allocate_detail(int ninputs,
     for (int i = 0; i < noutputs; i++) {
         expand_minmax_buffer(i);
 
-        buffer_sptr buffer = allocate_buffer(
-            i, downstream_max_nitems_vec[i], downstream_lcm_nitems_vec[i]);
+        buffer_sptr buffer = allocate_buffer(i,
+                                             downstream_max_nitems_vec[i],
+                                             downstream_lcm_nitems_vec[i],
+                                             downstream_max_out_mult_vec[i]);
         GR_LOG_DEBUG(d_debug_logger,
                      "Allocated buffer for output " + identifier() + " " +
                          std::to_string(i));
@@ -413,19 +416,24 @@ void block::allocate_detail(int ninputs,
     set_detail(detail);
 }
 
-buffer_sptr block::replace_buffer(uint32_t out_port, block_sptr block_owner)
+buffer_sptr
+block::replace_buffer(uint32_t src_port, uint32_t dst_port, block_sptr block_owner)
 {
     block_detail_sptr detail_ = detail();
-    buffer_sptr orig_buffer = detail_->output(out_port);
+    buffer_sptr orig_buffer = detail_->output(src_port);
 
-    // Make a new buffer but this time use the passed in block as the owner
-    buffer_sptr new_buffer = make_buffer(orig_buffer->bufsize(),
-                                         orig_buffer->get_sizeof_item(),
-                                         orig_buffer->get_downstream_lcm_nitems(),
-                                         shared_from_base<block>(),
-                                         block_owner);
+    buffer_type buftype = block_owner->output_signature()->stream_buffer_type(dst_port);
 
-    detail_->set_output(out_port, new_buffer);
+    // Make a new buffer but this time use the passed in block as the owner
+    buffer_sptr new_buffer =
+        buftype.make_buffer(orig_buffer->bufsize(),
+                            orig_buffer->get_sizeof_item(),
+                            orig_buffer->get_downstream_lcm_nitems(),
+                            orig_buffer->get_max_reader_output_multiple(),
+                            shared_from_base<block>(),
+                            block_owner);
+
+    detail_->set_output(src_port, new_buffer);
     return new_buffer;
 }
 
@@ -435,7 +443,8 @@ void block::enable_update_rate(bool en) { d_update_rate = en; }
 
 buffer_sptr block::allocate_buffer(int port,
                                    int downstream_max_nitems,
-                                   uint64_t downstream_lcm_nitems)
+                                   uint64_t downstream_lcm_nitems,
+                                   uint32_t downstream_max_out_mult)
 {
     int item_size = output_signature()->sizeof_stream_item(port);
 
@@ -473,14 +482,16 @@ buffer_sptr block::allocate_buffer(int port,
     buffer_sptr buf;
 
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
     GR_LOG_DEBUG(d_logger,
                  "Block: " + name() + " allocated buffer for output " + identifier());
 #endif
 
+    // Grab the buffer type associated with the output port and use it to
+    // create the specified type of buffer
+    buffer_type buftype = output_signature()->stream_buffer_type(port);
+
     try {
 #ifdef BUFFER_DEBUG
-        // BUFFER DEBUG
         std::ostringstream msg;
         msg << "downstream_max_nitems: " << downstream_max_nitems
             << " -- downstream_lcm_nitems: " << downstream_lcm_nitems
@@ -498,18 +509,20 @@ buffer_sptr block::allocate_buffer(int port,
         }
         GR_LOG_DEBUG(d_logger, msg.str());
 #endif
-        buf = make_buffer(nitems,
-                          item_size,
-                          downstream_lcm_nitems,
-                          shared_from_base<block>(),
-                          shared_from_base<block>());
+        buf = buftype.make_buffer(nitems,
+                                  item_size,
+                                  downstream_lcm_nitems,
+                                  downstream_max_out_mult,
+                                  shared_from_base<block>(),
+                                  shared_from_base<block>());
 
     } catch (std::bad_alloc&) {
-        buf = make_buffer(nitems,
-                          item_size,
-                          downstream_lcm_nitems,
-                          shared_from_base<block>(),
-                          shared_from_base<block>());
+        buf = buftype.make_buffer(nitems,
+                                  item_size,
+                                  downstream_lcm_nitems,
+                                  downstream_max_out_mult,
+                                  shared_from_base<block>(),
+                                  shared_from_base<block>());
     }
 
     // Set the max noutput items size here to make sure it's always
diff --git a/gnuradio-runtime/lib/block_detail.cc b/gnuradio-runtime/lib/block_detail.cc
index f5283c56b0..66b32ec078 100644
--- a/gnuradio-runtime/lib/block_detail.cc
+++ b/gnuradio-runtime/lib/block_detail.cc
@@ -114,6 +114,7 @@ void block_detail::consume_each(int how_many_items)
 void block_detail::produce(int which_output, int how_many_items)
 {
     if (how_many_items > 0) {
+        d_output[which_output]->post_work(how_many_items);
         d_output[which_output]->update_write_pointer(how_many_items);
         d_produce_or |= how_many_items;
     }
@@ -123,6 +124,7 @@ void block_detail::produce_each(int how_many_items)
 {
     if (how_many_items > 0) {
         for (int i = 0; i < noutputs(); i++) {
+            d_output[i]->post_work(how_many_items);
             d_output[i]->update_write_pointer(how_many_items);
         }
         d_produce_or |= how_many_items;
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc
index 6fbf2c5c17..ea59196571 100644
--- a/gnuradio-runtime/lib/block_executor.cc
+++ b/gnuradio-runtime/lib/block_executor.cc
@@ -59,9 +59,11 @@ static int min_available_space(block* m,
                                int min_noutput_items,
                                int& output_idx)
 {
+#if ENABLE_LOGGING
     gr::logger_ptr logger;
     gr::logger_ptr debug_logger;
     gr::configure_default_loggers(logger, debug_logger, "min_available_space");
+#endif
 
     int min_space = std::numeric_limits<int>::max();
     if (min_noutput_items == 0)
@@ -285,7 +287,7 @@ block_executor::state block_executor::run_one_iteration()
 
         // determine the minimum available output space
         output_idx = 0;
-    out_try_again:
+    blkd_out_try_again:
         noutput_items = min_available_space(
             m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
         noutput_items = std::min(noutput_items, max_noutput_items);
@@ -312,7 +314,7 @@ block_executor::state block_executor::run_one_iteration()
                         msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx
                             << ")";
                         GR_LOG_INFO(d_debug_logger, msg.str()););
-                    goto out_try_again;
+                    goto blkd_out_try_again;
                 }
             } else {
                 return BLKD_OUT;
@@ -329,7 +331,6 @@ block_executor::state block_executor::run_one_iteration()
         d_input_done.resize(d->ninputs());
         d_output_items.resize(0);
         d_start_nitems_read.resize(d->ninputs());
-        //        LOG(GR_LOG_INFO(d_debug_logger, "sink"););
         LOG(std::ostringstream msg; msg << m << " -- sink";
             GR_LOG_INFO(d_debug_logger, msg.str()););
 
@@ -367,7 +368,6 @@ block_executor::state block_executor::run_one_iteration()
             GR_LOG_INFO(d_debug_logger, msg.str()););
 
         if (noutput_items == 0) { // we're blocked on input
-            //            LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
             LOG(std::ostringstream msg; msg << m << " -- BLKD_IN";
                 GR_LOG_INFO(d_debug_logger, msg.str()));
             return BLKD_IN;
@@ -400,7 +400,7 @@ block_executor::state block_executor::run_one_iteration()
 
         // determine the minimum available output space
         output_idx = 0;
-    out_try_again2:
+    blkd_out_try_again2:
         noutput_items = min_available_space(
             m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
         if (ENABLE_LOGGING) {
@@ -415,7 +415,6 @@ block_executor::state block_executor::run_one_iteration()
             goto were_done;
 
         if (noutput_items == 0) { // we're output blocked
-            //            LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
             LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT";
                 GR_LOG_INFO(d_debug_logger, msg.str()));
 
@@ -435,7 +434,7 @@ block_executor::state block_executor::run_one_iteration()
                         msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx
                             << ")";
                         GR_LOG_INFO(d_debug_logger, msg.str()););
-                    goto out_try_again2;
+                    goto blkd_out_try_again2;
                 }
             } else {
                 return BLKD_OUT;
@@ -532,10 +531,6 @@ block_executor::state block_executor::run_one_iteration()
 
             buffer_reader_sptr in_buf = d->input(i);
 
-            LOG(std::ostringstream msg;
-                msg << m << " (t: " << this << ") -- pre-callback";
-                GR_LOG_DEBUG(d_debug_logger, msg.str()));
-
             if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) {
                 gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer());
                 if (in_buf->input_blocked_callback(d_ninput_items_required[i],
@@ -681,8 +676,8 @@ block_executor::state block_executor::run_one_iteration()
                 GR_LOG_DEBUG(d_debug_logger, msg.str()););
             gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
             out_buf->output_blocked_callback(m->output_multiple(), true);
-            LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i
-                                            << "] -- OUTPUT BLOCKED CBACK: " << rc;
+            LOG(std::ostringstream msg;
+                msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED CBACK ";
                 GR_LOG_DEBUG(d_debug_logger, msg.str()););
         }
 
@@ -692,7 +687,6 @@ block_executor::state block_executor::run_one_iteration()
     GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine");
 
 were_done:
-    //    LOG(GR_LOG_INFO(d_debug_logger, "we're done"););
     LOG(std::ostringstream msg; msg << m << " -- we're done";
         GR_LOG_INFO(d_debug_logger, msg.str()));
     d->set_done(true);
diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc
index 7fd0a39579..75fc447e8d 100644
--- a/gnuradio-runtime/lib/buffer.cc
+++ b/gnuradio-runtime/lib/buffer.cc
@@ -12,6 +12,7 @@
 #include "config.h"
 #endif
 #include "vmcircbuf.h"
+#include <gnuradio/block.h>
 #include <gnuradio/buffer.h>
 #include <gnuradio/buffer_double_mapped.h>
 #include <gnuradio/buffer_reader.h>
@@ -56,10 +57,11 @@ static long s_buffer_count = 0; // counts for debugging storage mgmt
  ---------------------------------------------------------------------------- */
 
 
-buffer::buffer(BufferMappingType buf_type,
+buffer::buffer(buffer_mapping_type buf_type,
                int nitems,
                size_t sizeof_item,
                uint64_t downstream_lcm_nitems,
+               uint32_t downstream_max_out_mult,
                block_sptr link)
     : d_base(0),
       d_bufsize(0),
@@ -76,7 +78,9 @@ buffer::buffer(BufferMappingType buf_type,
       d_callback_flag(false),
       d_active_pointer_counter(0),
       d_downstream_lcm_nitems(downstream_lcm_nitems),
-      d_write_multiple(0)
+      d_write_multiple(0),
+      d_max_reader_output_multiple(downstream_max_out_mult),
+      d_context(buffer_context::DEFAULT_INVALID)
 {
     gr::configure_default_loggers(d_logger, d_debug_logger, "buffer");
 
@@ -86,44 +90,27 @@ buffer::buffer(BufferMappingType buf_type,
 buffer_sptr make_buffer(int nitems,
                         size_t sizeof_item,
                         uint64_t downstream_lcm_nitems,
+                        uint32_t downstream_max_out_mult,
                         block_sptr link,
                         block_sptr buf_owner)
 {
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
     gr::logger_ptr logger;
     gr::logger_ptr debug_logger;
     gr::configure_default_loggers(logger, debug_logger, "make_buffer");
     std::ostringstream msg;
 #endif
 
-#if DEBUG_SINGLE_MAPPED
-    if (1) {
-#else
-    if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) {
-#endif
-        // Buffer type is NOT the default non custom variety so allocate a
-        // buffer_single_mapped instance
-#ifdef BUFFER_DEBUG
-        msg << "buffer_single_mapped nitems: " << nitems
-            << " -- sizeof_item: " << sizeof_item;
-        GR_LOG_DEBUG(logger, msg.str());
-#endif
-
-        return buffer_sptr(new buffer_single_mapped(
-            nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner));
-
-    } else {
-        // Default to allocating a buffer_double_mapped instance
-#ifdef BUFFER_DEBUG
-        msg << "buffer_double_mapped nitems: " << nitems
-            << " -- sizeof_item: " << sizeof_item;
-        GR_LOG_DEBUG(logger, msg.str());
-#endif
-
-        return buffer_sptr(
-            new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
-    }
+    // NOTE: This function is no longer called by flat_flowgraph functions and
+    // therefore is somewhat deprecated. It will create and return a
+    // buffer_double_mapped subclass by default.
+    buffer_type buftype = buffer_double_mapped::type;
+    return buftype.make_buffer(nitems,
+                               sizeof_item,
+                               downstream_lcm_nitems,
+                               downstream_max_out_mult,
+                               link,
+                               buf_owner);
 }
 
 buffer::~buffer()
@@ -134,12 +121,16 @@ buffer::~buffer()
 
 void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
 
+const void* buffer::_read_pointer(unsigned int read_index)
+{
+    return &d_base[read_index * d_sizeof_item];
+}
+
 void buffer::update_write_pointer(int nitems)
 {
     gr::thread::scoped_lock guard(*mutex());
 
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
     unsigned orig_wr_idx = d_write_index;
 #endif
 
@@ -147,7 +138,6 @@ void buffer::update_write_pointer(int nitems)
     d_abs_write_offset += nitems;
 
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
     std::ostringstream msg;
     msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx
         << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index;
@@ -266,4 +256,21 @@ std::ostream& operator<<(std::ostream& os, const buffer& buf)
     return os;
 }
 
+void buffer::set_context(const buffer_context& context)
+{
+    if ((d_context == buffer_context::DEFAULT_INVALID) || (d_context == context)) {
+        // Set the context if the existing value is the default or if it is the
+        // same as what's already been set
+        d_context = context;
+    } else {
+        // Otherwise error out as the context value cannot be changed after
+        // it is set
+        std::ostringstream msg;
+        msg << "Block: " << link()->identifier() << " has context " << d_context
+            << " assigned. Cannot change to context " << context << ".";
+        GR_LOG_ERROR(d_logger, msg.str());
+        throw std::runtime_error(msg.str());
+    }
+}
+
 } /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_context.cc b/gnuradio-runtime/lib/buffer_context.cc
new file mode 100644
index 0000000000..9cd27add36
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_context.cc
@@ -0,0 +1,32 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+#include <gnuradio/buffer_context.h>
+
+namespace gr {
+
+std::ostream& operator<<(std::ostream& os, const buffer_context& context)
+{
+    switch (context) {
+    case buffer_context::DEFAULT_INVALID:
+        return os << "DEFAULT_INVALID";
+    case buffer_context::HOST_TO_DEVICE:
+        return os << "HOST_TO_DEVICE";
+    case buffer_context::DEVICE_TO_HOST:
+        return os << "DEVICE_TO_HOST";
+    case buffer_context::HOST_TO_HOST:
+        return os << "HOST_TO_HOST";
+    case buffer_context::DEVICE_TO_DEVICE:
+        return os << "DEVICE_TO_DEVICE";
+    default:
+        return os << "Unknown buffer context: " << static_cast<int>(context);
+    }
+}
+
+} // namespace gr
diff --git a/gnuradio-runtime/lib/buffer_double_mapped.cc b/gnuradio-runtime/lib/buffer_double_mapped.cc
index ad99c2162b..700ebac0c1 100644
--- a/gnuradio-runtime/lib/buffer_double_mapped.cc
+++ b/gnuradio-runtime/lib/buffer_double_mapped.cc
@@ -36,23 +36,25 @@ static inline long minimum_buffer_items(long type_size, long page_size)
     return page_size / GR_GCD(type_size, page_size);
 }
 
+buffer_type buffer_double_mapped::type(buftype_DEFAULT_NON_CUSTOM{});
 
 buffer_double_mapped::buffer_double_mapped(int nitems,
                                            size_t sizeof_item,
                                            uint64_t downstream_lcm_nitems,
+                                           uint32_t downstream_max_out_mult,
                                            block_sptr link)
-    : buffer(BufferMappingType::DoubleMapped,
+    : buffer(buffer_mapping_type::double_mapped,
              nitems,
              sizeof_item,
              downstream_lcm_nitems,
+             downstream_max_out_mult,
              link)
 {
     gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_double_mapped");
-    if (!allocate_buffer(nitems, sizeof_item))
+    if (!allocate_buffer(nitems))
         throw std::bad_alloc();
 
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
     {
         std::ostringstream msg;
         msg << "[" << this << "] "
@@ -62,13 +64,18 @@ buffer_double_mapped::buffer_double_mapped(int nitems,
 #endif
 }
 
+// NB: Added the extra 'block_sptr unused' parameter so that the
+// call signature matches the other factory-like functions used to create
+// the buffer_single_mapped subclasses
 buffer_sptr make_buffer_double_mapped(int nitems,
                                       size_t sizeof_item,
                                       uint64_t downstream_lcm_nitems,
-                                      block_sptr link)
+                                      uint32_t downstream_max_out_mult,
+                                      block_sptr link,
+                                      block_sptr unused)
 {
-    return buffer_sptr(
-        new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
+    return buffer_sptr(new buffer_double_mapped(
+        nitems, sizeof_item, downstream_lcm_nitems, downstream_max_out_mult, link));
 }
 
 buffer_double_mapped::~buffer_double_mapped() {}
@@ -77,13 +84,13 @@ buffer_double_mapped::~buffer_double_mapped() {}
  * sets d_vmcircbuf, d_base, d_bufsize.
  * returns true iff successful.
  */
-bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
+bool buffer_double_mapped::allocate_buffer(int nitems)
 {
     int orig_nitems = nitems;
 
     // Any buffer size we come up with must be a multiple of min_nitems.
     int granularity = gr::vmcircbuf_sysconfig::granularity();
-    int min_nitems = minimum_buffer_items(sizeof_item, granularity);
+    int min_nitems = minimum_buffer_items(d_sizeof_item, granularity);
 
     // Round-up nitems to a multiple of min_nitems.
     if (nitems % min_nitems != 0)
@@ -91,7 +98,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
 
     // If we rounded-up a whole bunch, give the user a heads up.
     // This only happens if sizeof_item is not a power of two.
-    if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) {
+    if (nitems > 2 * orig_nitems && nitems * (int)d_sizeof_item > granularity) {
         auto msg =
             str(boost::format(
                     "allocate_buffer: tried to allocate"
@@ -99,7 +106,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
                     "   %d were allocated.  If this isn't OK, consider padding"
                     "   your structure to a power-of-two bytes."
                     "   On this platform, our allocation granularity is %d bytes.") %
-                orig_nitems % sizeof_item % nitems % granularity);
+                orig_nitems % d_sizeof_item % nitems % granularity);
         GR_LOG_WARN(d_logger, msg.c_str());
     }
 
@@ -138,7 +145,6 @@ int buffer_double_mapped::space_available()
         }
 
 #ifdef BUFFER_DEBUG
-        // BUFFER DEBUG
         std::ostringstream msg;
         msg << "[" << this << "] "
             << "space_available() called  d_write_index: " << d_write_index
diff --git a/gnuradio-runtime/lib/buffer_reader.cc b/gnuradio-runtime/lib/buffer_reader.cc
index 7ead53032e..b1c1e7fb73 100644
--- a/gnuradio-runtime/lib/buffer_reader.cc
+++ b/gnuradio-runtime/lib/buffer_reader.cc
@@ -34,11 +34,11 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay
 
     buffer_reader_sptr r;
 
-    if (buf->get_mapping_type() == BufferMappingType::DoubleMapped) {
+    if (buf->get_mapping_type() == buffer_mapping_type::double_mapped) {
         r.reset(new buffer_reader(
             buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
         r->declare_sample_delay(delay);
-    } else if (buf->get_mapping_type() == BufferMappingType::SingleMapped) {
+    } else if (buf->get_mapping_type() == buffer_mapping_type::single_mapped) {
         r.reset(new buffer_reader_sm(
             buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
         r->declare_sample_delay(delay);
@@ -51,11 +51,15 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay
     buf->d_readers.push_back(r.get());
 
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
-    std::cerr << " [" << buf.get() << ";" << r.get()
-              << "] buffer_add_reader() nzero_preload " << nzero_preload
-              << " -- delay: " << delay << " -- history: " << link->history()
-              << " -- RD_idx: " << r->d_read_index << std::endl;
+    gr::logger_ptr logger;
+    gr::logger_ptr debug_logger;
+    gr::configure_default_loggers(logger, debug_logger, "buffer_add_reader");
+
+    std::ostringstream msg;
+    msg << " [" << buf.get() << ";" << r.get() << "] buffer_add_reader() nzero_preload "
+        << nzero_preload << " -- delay: " << delay << " -- history: " << link->history()
+        << " -- RD_idx: " << r->d_read_index;
+    GR_LOG_DEBUG(debug_logger, msg.str());
 #endif
 
     return r;
@@ -89,12 +93,11 @@ void buffer_reader::declare_sample_delay(unsigned delay)
 
 unsigned buffer_reader::sample_delay() const { return d_attr_delay; }
 
-int buffer_reader::items_available() // const
+int buffer_reader::items_available() const
 {
     int available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
 
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
     std::ostringstream msg;
     msg << "[" << d_buffer << ";" << this << "] "
         << "items_available() WR_idx: " << d_buffer->d_write_index
@@ -109,7 +112,8 @@ int buffer_reader::items_available() // const
 
 const void* buffer_reader::read_pointer()
 {
-    return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item];
+    // Delegate to buffer subclass
+    return d_buffer->_read_pointer(d_read_index);
 }
 
 void buffer_reader::update_read_pointer(int nitems)
@@ -117,7 +121,6 @@ void buffer_reader::update_read_pointer(int nitems)
     gr::thread::scoped_lock guard(*mutex());
 
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
     unsigned orig_rd_idx = d_read_index;
 #endif
 
@@ -125,7 +128,6 @@ void buffer_reader::update_read_pointer(int nitems)
     d_abs_read_offset += nitems;
 
 #ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
     std::ostringstream msg;
     msg << "[" << d_buffer << ";" << this
         << "] update_read_pointer -- orig d_read_index: " << orig_rd_idx
diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc
index 9e0fac584f..c0d4b07d65 100644
--- a/gnuradio-runtime/lib/buffer_reader_sm.cc
+++ b/gnuradio-runtime/lib/buffer_reader_sm.cc
@@ -26,13 +26,14 @@ namespace gr {
 
 buffer_reader_sm::~buffer_reader_sm() {}
 
-int buffer_reader_sm::items_available()
+int buffer_reader_sm::items_available() const
 {
     int available = 0;
 
     if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
         if (d_buffer->d_write_index == d_read_index) {
-            if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+            if ((nitems_read() - sample_delay()) !=
+                (d_buffer->nitems_written() + link()->history() - 1)) {
                 available = d_buffer->d_bufsize - d_read_index;
             }
         } else {
@@ -55,87 +56,15 @@ int buffer_reader_sm::items_available()
 
 bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const
 {
-    gr::thread::scoped_lock(*d_buffer->mutex());
-
-    return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
-            (d_buffer->d_write_index < d_read_index));
+    return d_buffer->input_blkd_cb_ready(items_required, d_read_index);
 }
 
 bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail)
 {
-    // Maybe adjust read pointers from min read index?
-    // This would mean that *all* readers must be > (passed) the write index
-    if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
-        (d_buffer->d_write_index < d_read_index)) {
-
-        // Update items available before going farther as it could be stale
-        items_avail = items_available();
-
-        // Find reader with the smallest read index that is greater than the
-        // write index
-        uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
-        uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
-        for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
-            if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) {
-                // Record index of reader with minimum read-index
-                if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) {
-                    min_read_idx = d_buffer->d_readers[idx]->d_read_index;
-                    min_reader_index = idx;
-                }
-            }
-        }
-
-        // Note items_avail might be zero, that's okay.
-        items_avail += d_read_index - min_read_idx;
-        int gap = min_read_idx - d_buffer->d_write_index;
-        if (items_avail > gap) {
-            return false;
-        }
-
-#ifdef BUFFER_DEBUG
-        std::ostringstream msg;
-        msg << "[" << d_buffer << ";" << this << "] "
-            << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index
-            << " -- WR items: " << d_buffer->nitems_written()
-            << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx;
-        for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
-            if (idx != min_reader_index) {
-                msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index;
-            }
-        }
-
-        msg << " -- GAP: " << gap << " -- items_required: " << items_required
-            << " -- items_avail: " << items_avail;
-        GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
-        // Shift existing data down to make room for blocked data at end of buffer
-        uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item;
-        char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item);
-        std::memmove(dest, d_buffer->d_base, move_data_size);
-
-        // Next copy the data from the end of the buffer back to the beginning
-        uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item;
-        char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item);
-        std::memcpy(d_buffer->d_base, src, avail_data_size);
-
-        // Now adjust write pointer
-        d_buffer->d_write_index += items_avail;
-
-        // Finally adjust all reader pointers
-        for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
-            if (idx == min_reader_index) {
-                d_buffer->d_readers[idx]->d_read_index = 0;
-            } else {
-                d_buffer->d_readers[idx]->d_read_index += items_avail;
-                d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize;
-            }
-        }
-
-        return true;
-    }
+    // Update items available before going farther as it could be stale
+    items_avail = items_available();
 
-    return false;
+    return d_buffer->input_blocked_callback(items_required, items_avail, d_read_index);
 }
 
 buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer,
diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc
index 6024396557..db7959c5b4 100644
--- a/gnuradio-runtime/lib/buffer_single_mapped.cc
+++ b/gnuradio-runtime/lib/buffer_single_mapped.cc
@@ -19,6 +19,8 @@
 #include <gnuradio/thread/thread.h>
 #include <assert.h>
 #include <algorithm>
+#include <cstdlib>
+#include <cstring>
 #include <iostream>
 #include <stdexcept>
 
@@ -27,30 +29,18 @@ namespace gr {
 buffer_single_mapped::buffer_single_mapped(int nitems,
                                            size_t sizeof_item,
                                            uint64_t downstream_lcm_nitems,
+                                           uint32_t downstream_max_out_mult,
                                            block_sptr link,
                                            block_sptr buf_owner)
-    : buffer(BufferMappingType::SingleMapped,
+    : buffer(buffer_mapping_type::single_mapped,
              nitems,
              sizeof_item,
              downstream_lcm_nitems,
+             downstream_max_out_mult,
              link),
       d_buf_owner(buf_owner),
-      d_buffer(nullptr,
-               std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1))
+      d_buffer(nullptr)
 {
-    gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped");
-    if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems))
-        throw std::bad_alloc();
-
-#ifdef BUFFER_DEBUG
-    // BUFFER DEBUG
-    {
-        std::ostringstream msg;
-        msg << "[" << this << "] "
-            << "buffer_single_mapped constructor -- history: " << link->history();
-        GR_LOG_DEBUG(d_logger, msg.str());
-    }
-#endif
 }
 
 buffer_single_mapped::~buffer_single_mapped() {}
@@ -59,14 +49,21 @@ buffer_single_mapped::~buffer_single_mapped() {}
  * Allocates underlying buffer.
  * returns true iff successful.
  */
-bool buffer_single_mapped::allocate_buffer(int nitems,
-                                           size_t sizeof_item,
-                                           uint64_t downstream_lcm_nitems)
+bool buffer_single_mapped::allocate_buffer(int nitems)
 {
 #ifdef BUFFER_DEBUG
     int orig_nitems = nitems;
 #endif
 
+    // For single mapped buffers resize the initial size to be at least four
+    // times the size of the largest of any downstream block's output multiple.
+    // This helps reduce the number of times the input_block_callback() might be
+    // called which should help overall performance (particularly if the max
+    // output multiple is large) at the cost of slightly more buffer space.
+    if (static_cast<uint32_t>(nitems) < (4 * d_max_reader_output_multiple)) {
+        nitems = 4 * d_max_reader_output_multiple;
+    }
+
     // Unlike the double mapped buffer case that can easily wrap back onto itself
     // for both reads and writes the single mapped case needs to be aware of read
     // and write granularity and size the underlying buffer accordingly. Otherwise
@@ -105,33 +102,37 @@ bool buffer_single_mapped::allocate_buffer(int nitems,
 #endif
 
     // Adjust size so output buffer size is a multiple of the write granularity
-    if (write_granularity != 1 || downstream_lcm_nitems != 1) {
-        uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems);
+    if (write_granularity != 1 || d_downstream_lcm_nitems != 1) {
+        uint64_t size_align_adjust = GR_LCM(write_granularity, d_downstream_lcm_nitems);
         uint64_t remainder = nitems % size_align_adjust;
         nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0;
 
 #ifdef BUFFER_DEBUG
         std::ostringstream msg;
         msg << "allocate_buffer()** called  nitems: " << orig_nitems
-            << " -- read_multiple: " << downstream_lcm_nitems
+            << " -- read_multiple: " << d_downstream_lcm_nitems
             << " -- write_multiple: " << write_granularity
             << " -- NEW nitems: " << nitems;
         GR_LOG_DEBUG(d_logger, msg.str());
 #endif
     }
 
-    // Allocate a new custom buffer from the owning block
-    char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item);
-    assert(buf != nullptr);
-    d_buffer.reset(buf);
-
-    d_base = d_buffer.get();
     d_bufsize = nitems;
 
-    d_downstream_lcm_nitems = downstream_lcm_nitems;
+    d_downstream_lcm_nitems = d_downstream_lcm_nitems;
     d_write_multiple = write_granularity;
 
-    return true;
+    // Do the actual allocation(s) with the finalized nitems
+    return do_allocate_buffer(nitems, d_sizeof_item);
+}
+
+bool buffer_single_mapped::input_blkd_cb_ready(int items_required,
+                                               unsigned int read_index)
+{
+    gr::thread::scoped_lock(*this->mutex());
+
+    return (((d_bufsize - read_index) < (uint32_t)items_required) &&
+            (d_write_index < read_index));
 }
 
 bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
@@ -145,70 +146,6 @@ bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
             ((space_avail / output_multiple) * output_multiple == 0));
 }
 
-
-bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force)
-{
-    uint32_t space_avail = space_available();
-
-#ifdef BUFFER_DEBUG
-    std::ostringstream msg;
-    msg << "[" << this << "] "
-        << "output_blocked_callback()*** WR_idx: " << d_write_index
-        << " -- WR items: " << nitems_written()
-        << " -- output_multiple: " << output_multiple
-        << " -- space_avail: " << space_avail << " -- force: " << force;
-    GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
-    if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
-        force) {
-        // Find reader with the smallest read index
-        uint32_t min_read_idx = d_readers[0]->d_read_index;
-        for (size_t idx = 1; idx < d_readers.size(); ++idx) {
-            // Record index of reader with minimum read-index
-            if (d_readers[idx]->d_read_index < min_read_idx) {
-                min_read_idx = d_readers[idx]->d_read_index;
-            }
-        }
-
-#ifdef BUFFER_DEBUG
-        std::ostringstream msg;
-        msg << "[" << this << "] "
-            << "output_blocked_callback() WR_idx: " << d_write_index
-            << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
-            << " -- shortcircuit: "
-            << ((min_read_idx == 0) || (min_read_idx >= d_write_index))
-            << " -- to_move_items: " << (d_write_index - min_read_idx)
-            << " -- space_avail: " << space_avail << " -- force: " << force;
-        GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
-        // Make sure we have enough room to start writing back at the beginning
-        if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) {
-            return false;
-        }
-
-        // Determine how much "to be read" data needs to be moved
-        int to_move_items = d_write_index - min_read_idx;
-        assert(to_move_items > 0);
-        uint32_t to_move_bytes = to_move_items * d_sizeof_item;
-
-        // Shift "to be read" data back to the beginning of the buffer
-        std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes);
-
-        // Adjust write index and each reader index
-        d_write_index -= min_read_idx;
-
-        for (size_t idx = 0; idx < d_readers.size(); ++idx) {
-            d_readers[idx]->d_read_index -= min_read_idx;
-        }
-
-        return true;
-    }
-
-    return false;
-}
-
 int buffer_single_mapped::space_available()
 {
     if (d_readers.empty())
@@ -233,7 +170,7 @@ int buffer_single_mapped::space_available()
         // For single mapped buffer there is no wrapping beyond the end of the
         // buffer
 #ifdef BUFFER_DEBUG
-        int thecase = 4; // REMOVE ME - just for debug
+        int thecase = 0;
 #endif
         int space = d_bufsize - d_write_index;
 
@@ -259,15 +196,17 @@ int buffer_single_mapped::space_available()
 #endif
             space = min_read_index - d_write_index;
             // Leave extra space in case the reader gets stuck and needs realignment
-            {
-                if ((d_write_index > (d_bufsize / 2)) ||
-                    (min_read_index < (d_bufsize / 2))) {
+            if (d_max_reader_output_multiple > 1) {
+                if (static_cast<uint32_t>(space) > d_max_reader_output_multiple) {
 #ifdef BUFFER_DEBUG
-                    thecase = 17;
+                    thecase = 4;
 #endif
-                    space = 0;
+                    space = space - d_max_reader_output_multiple;
                 } else {
-                    space = (d_bufsize / 2) - d_write_index;
+#ifdef BUFFER_DEBUG
+                    thecase = 5;
+#endif
+                    space = 0;
                 }
             }
         }
@@ -278,7 +217,6 @@ int buffer_single_mapped::space_available()
         }
 
 #ifdef BUFFER_DEBUG
-        // BUFFER DEBUG
         std::ostringstream msg;
         msg << "[" << this << "] "
             << "space_available() called  (case: " << thecase
@@ -286,6 +224,7 @@ int buffer_single_mapped::space_available()
             << " -- min_read_index: " << min_read_index << " ("
             << min_idx_reader->nitems_read() << ") "
             << " -- space: " << space
+            << " -- max_reader_out_mult: " << d_max_reader_output_multiple
             << " (sample delay: " << min_idx_reader->sample_delay() << ")";
         GR_LOG_DEBUG(d_logger, msg.str());
 #endif
@@ -294,4 +233,198 @@ int buffer_single_mapped::space_available()
     }
 }
 
+void buffer_single_mapped::update_reader_block_history(unsigned history, int delay)
+{
+    unsigned old_max = d_max_reader_history;
+    d_max_reader_history = std::max(d_max_reader_history, history);
+    if (d_max_reader_history != old_max) {
+        d_write_index = d_max_reader_history - 1;
+
+#ifdef BUFFER_DEBUG
+        std::ostringstream msg;
+        msg << "[" << this << "] "
+            << "buffer_single_mapped constructor -- set wr index to: " << d_write_index;
+        GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+        // Reset the reader's read index if the buffer's write index has changed.
+        // Note that "history - 1" is the nzero_preload value passed to
+        // buffer_add_reader.
+        for (auto reader : d_readers) {
+            reader->d_read_index = d_write_index - (reader->link()->history() - 1);
+        }
+    }
+
+    // Only attempt to set has history flag if it is not already set
+    if (!d_has_history) {
+        // Blocks that set delay may set history to delay + 1 but this is
+        // not "real" history
+        d_has_history = ((static_cast<int>(history) - 1) != delay);
+    }
+}
+
+//------------------------------------------------------------------------------
+
+bool buffer_single_mapped::input_blocked_callback_logic(int items_required,
+                                                        int items_avail,
+                                                        unsigned read_index,
+                                                        char* buffer_ptr,
+                                                        memcpy_func_t memcpy_func,
+                                                        memmove_func_t memmove_func)
+{
+#ifdef BUFFER_DEBUG
+    std::ostringstream msg;
+    msg << "[" << this << "] "
+        << "input_blocked_callback() WR_idx: " << d_write_index
+        << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize
+        << " -- RD_idx: " << read_index << " -- items_required: " << items_required
+        << " -- items_avail: " << items_avail;
+    GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+    // Maybe adjust read pointers from min read index?
+    // This would mean that *all* readers must be > (passed) the write index
+    if (((d_bufsize - read_index) < (uint32_t)items_required) &&
+        (d_write_index < read_index)) {
+
+        // Find reader with the smallest read index that is greater than the
+        // write index
+        uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
+        uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
+        for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+            if (d_readers[idx]->d_read_index > d_write_index) {
+                // Record index of reader with minimum read-index
+                if (d_readers[idx]->d_read_index < min_read_idx) {
+                    min_read_idx = d_readers[idx]->d_read_index;
+                    min_reader_index = idx;
+                }
+            }
+        }
+
+        // Note items_avail might be zero, that's okay.
+        items_avail += read_index - min_read_idx;
+        int gap = min_read_idx - d_write_index;
+        if (items_avail > gap) {
+            return false;
+        }
+
+#ifdef BUFFER_DEBUG
+        std::ostringstream msg;
+        msg << "[" << this << "] "
+            << "input_blocked_callback() WR_idx: " << d_write_index
+            << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize
+            << " -- RD_idx: " << min_read_idx;
+        for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+            if (idx != min_reader_index) {
+                msg << " -- OTHER_RDR: " << d_readers[idx]->d_read_index;
+            }
+        }
+
+        msg << " -- GAP: " << gap << " -- items_required: " << items_required
+            << " -- items_avail: " << items_avail;
+        GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+        // Shift existing data down to make room for blocked data at end of buffer
+        uint32_t move_data_size = d_write_index * d_sizeof_item;
+        char* dest = buffer_ptr + (items_avail * d_sizeof_item);
+        memmove_func(dest, buffer_ptr, move_data_size);
+
+        // Next copy the data from the end of the buffer back to the beginning
+        uint32_t avail_data_size = items_avail * d_sizeof_item;
+        char* src = buffer_ptr + (min_read_idx * d_sizeof_item);
+        memcpy_func(buffer_ptr, src, avail_data_size);
+
+        // Now adjust write pointer
+        d_write_index += items_avail;
+
+        // Finally adjust all reader pointers
+        for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+            if (idx == min_reader_index) {
+                d_readers[idx]->d_read_index = 0;
+            } else {
+                d_readers[idx]->d_read_index += items_avail;
+                d_readers[idx]->d_read_index %= d_bufsize;
+            }
+        }
+
+        return true;
+    }
+
+    return false;
+}
+
+bool buffer_single_mapped::output_blocked_callback_logic(int output_multiple,
+                                                         bool force,
+                                                         char* buffer_ptr,
+                                                         memmove_func_t memmove_func)
+{
+    uint32_t space_avail = space_available();
+
+#ifdef BUFFER_DEBUG
+    std::ostringstream msg;
+    msg << "[" << this << "] "
+        << "output_blocked_callback()*** WR_idx: " << d_write_index
+        << " -- WR items: " << nitems_written()
+        << " -- output_multiple: " << output_multiple
+        << " -- space_avail: " << space_avail << " -- force: " << force;
+    GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+    if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
+        force) {
+        // Find reader with the smallest read index
+        uint32_t min_read_idx = d_readers[0]->d_read_index;
+        uint64_t min_read_idx_nitems = d_readers[0]->nitems_read();
+        for (size_t idx = 1; idx < d_readers.size(); ++idx) {
+            // Record index of reader with minimum read-index
+            if (d_readers[idx]->d_read_index < min_read_idx) {
+                min_read_idx = d_readers[idx]->d_read_index;
+                min_read_idx_nitems = d_readers[idx]->nitems_read();
+            }
+        }
+
+#ifdef BUFFER_DEBUG
+        std::ostringstream msg;
+        msg << "[" << this << "] "
+            << "output_blocked_callback() WR_idx: " << d_write_index
+            << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
+            << " -- RD items: " << min_read_idx_nitems << " -- shortcircuit: "
+            << ((min_read_idx == 0) || (min_read_idx > d_write_index) ||
+                (min_read_idx == d_write_index &&
+                 min_read_idx_nitems != nitems_written()))
+            << " -- to_move_items: " << (d_write_index - min_read_idx)
+            << " -- space_avail: " << space_avail << " -- force: " << force;
+        GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+        // Make sure we have enough room to start writing back at the beginning
+        if ((min_read_idx == 0) || (min_read_idx > d_write_index) ||
+            (min_read_idx == d_write_index && min_read_idx_nitems != nitems_written())) {
+            return false;
+        }
+
+        // Determine how much "to be read" data needs to be moved
+        int to_move_items = d_write_index - min_read_idx;
+        if (to_move_items > 0) {
+            uint32_t to_move_bytes = to_move_items * d_sizeof_item;
+
+            // Shift "to be read" data back to the beginning of the buffer
+            memmove_func(
+                buffer_ptr, buffer_ptr + (min_read_idx * d_sizeof_item), to_move_bytes);
+        }
+
+        // Adjust write index and each reader index
+        d_write_index -= min_read_idx;
+
+        for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+            d_readers[idx]->d_read_index -= min_read_idx;
+        }
+
+        return true;
+    }
+
+    return false;
+}
+
 } /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_type.cc b/gnuradio-runtime/lib/buffer_type.cc
deleted file mode 100644
index b667eded1e..0000000000
--- a/gnuradio-runtime/lib/buffer_type.cc
+++ /dev/null
@@ -1,18 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2020 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-#include <gnuradio/buffer_type.h>
-
-namespace gr {
-
-uint32_t buffer_type_base::s_nextId = 0;
-std::mutex buffer_type_base::s_mutex;
-
-
-} /* namespace gr */
\ No newline at end of file
diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc
index b9986f8370..ef96ef21ac 100644
--- a/gnuradio-runtime/lib/flat_flowgraph.cc
+++ b/gnuradio-runtime/lib/flat_flowgraph.cc
@@ -15,6 +15,7 @@
 #include "flat_flowgraph.h"
 #include <gnuradio/block_detail.h>
 #include <gnuradio/buffer.h>
+#include <gnuradio/buffer_double_mapped.h>
 #include <gnuradio/buffer_reader.h>
 #include <gnuradio/buffer_type.h>
 #include <gnuradio/integer_math.h>
@@ -87,6 +88,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
     // Determine the downstream max per output port
     std::vector<int> downstream_max_nitems(noutputs, 0);
     std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1);
+    std::vector<uint32_t> downstream_max_out_mult(noutputs, 1);
 
 #ifdef BUFFER_DEBUG
     std::ostringstream msg;
@@ -96,6 +98,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
     for (int i = 0; i < noutputs; i++) {
         int nitems = 0;
         uint64_t lcm_nitems = 1;
+        uint32_t max_out_multiple = 1;
         basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i);
         for (basic_block_viter_t blk = downstream_blocks.begin();
              blk != downstream_blocks.end();
@@ -116,8 +119,8 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
             double decimation = (1.0 / dgrblock->relative_rate());
             int multiple = dgrblock->output_multiple();
             int history = dgrblock->history();
-            nitems =
-                std::max(nitems, static_cast<int>(2 * (decimation * multiple + history)));
+            nitems = std::max(
+                nitems, static_cast<int>(2 * (decimation * multiple + (history - 1))));
 
             // Calculate the LCM of downstream reader nitems
 #ifdef BUFFER_DEBUG
@@ -131,6 +134,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
                                     (uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) -
                                                (dgrblock->history() - 1)));
             }
+
             if (dgrblock->relative_rate() != 1.0) {
                 // Relative rate
                 lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d());
@@ -141,6 +145,10 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
                 lcm_nitems = 1;
             }
 
+            if (static_cast<uint32_t>(multiple) > max_out_multiple) {
+                max_out_multiple = multiple;
+            }
+
 #ifdef BUFFER_DEBUG
             msg.str("");
             msg << "        NINPUT_ITEMS: " << nitems;
@@ -161,11 +169,15 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
         }
         downstream_max_nitems[i] = nitems;
         downstream_lcm_nitems[i] = lcm_nitems;
+        downstream_max_out_mult[i] = max_out_multiple;
     }
 
     // Allocate the block detail and necessary buffers
-    grblock->allocate_detail(
-        ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems);
+    grblock->allocate_detail(ninputs,
+                             noutputs,
+                             downstream_max_nitems,
+                             downstream_lcm_nitems,
+                             downstream_max_out_mult);
 }
 
 void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
@@ -189,26 +201,46 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
         if (!src_grblock)
             throw std::runtime_error("connect_block_inputs found non-gr::block");
 
+        // In order to determine the buffer context, we need to examine both
+        // the upstream and the downstream buffer_types
+        buffer_type src_buf_type =
+            src_grblock->output_signature()->stream_buffer_type(src_port);
+        buffer_type dest_buf_type =
+            grblock->input_signature()->stream_buffer_type(dst_port);
+
+        buffer_context context;
+        if (src_buf_type == buffer_double_mapped::type &&
+            dest_buf_type == buffer_double_mapped::type) {
+            context = buffer_context::HOST_TO_HOST;
+        } else if (src_buf_type != buffer_double_mapped::type &&
+                   dest_buf_type == buffer_double_mapped::type) {
+            context = buffer_context::DEVICE_TO_HOST;
+        } else if (src_buf_type == buffer_double_mapped::type &&
+                   dest_buf_type != buffer_double_mapped::type) {
+            context = buffer_context::HOST_TO_DEVICE;
+        } else if (src_buf_type != buffer_double_mapped::type &&
+                   dest_buf_type != buffer_double_mapped::type) {
+            context = buffer_context::DEVICE_TO_DEVICE;
+        }
+
         buffer_sptr src_buffer;
-        buffer_type_t src_buf_type = src_grblock->get_buffer_type();
-        buffer_type_t dest_buf_type = grblock->get_buffer_type();
-        if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() ||
+        if (dest_buf_type == buffer_double_mapped::type ||
             dest_buf_type == src_buf_type) {
             // The block is not using a custom buffer OR the block and the upstream
             // block both use the same kind of custom buffer
             src_buffer = src_grblock->detail()->output(src_port);
         } else {
-            if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() &&
-                src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) {
+            if (dest_buf_type != buffer_double_mapped::type &&
+                src_buf_type == buffer_double_mapped::type) {
                 // The block uses a custom buffer but the upstream block does not
                 // therefore the upstream block's buffer can be replaced with the
                 // type of buffer that the block needs
                 std::ostringstream msg;
                 msg << "Block: " << grblock->identifier()
-                    << "replacing upstream block: " << src_grblock->identifier()
+                    << " replacing upstream block: " << src_grblock->identifier()
                     << " buffer with a custom buffer";
-                GR_LOG_DEBUG(d_debug_logger, msg.str());
-                src_buffer = src_grblock->replace_buffer(src_port, grblock);
+                GR_LOG_DEBUG(d_logger, msg.str());
+                src_buffer = src_grblock->replace_buffer(src_port, dst_port, grblock);
             } else {
                 // Both the block and upstream block use incompatible buffer types
                 // which is not currently allowed
@@ -223,9 +255,13 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
             }
         }
 
-        GR_LOG_DEBUG(d_debug_logger,
-                     "Setting input " + std::to_string(dst_port) + " from edge " +
-                         (*e).identifier());
+        // Set buffer's context
+        src_buffer->set_context(context);
+
+        std::ostringstream msg;
+        msg << "Setting input " << dst_port << " from edge " << (*e).identifier()
+            << " context: " << context;
+        GR_LOG_DEBUG(d_debug_logger, msg.str());
 
         detail->set_input(dst_port,
                           buffer_add_reader(src_buffer,
@@ -341,7 +377,6 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg)
         // Now deal with the fact that the block details might have
         // changed numbers of inputs and outputs vs. in the old
         // flowgraph.
-
         block->detail()->reset_nitem_counters();
         block->detail()->clear_tags();
     }
diff --git a/gnuradio-runtime/lib/host_buffer.cc b/gnuradio-runtime/lib/host_buffer.cc
new file mode 100644
index 0000000000..6e1a1d1003
--- /dev/null
+++ b/gnuradio-runtime/lib/host_buffer.cc
@@ -0,0 +1,255 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx Inc..
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include <cstring>
+#include <sstream>
+#include <stdexcept>
+
+#include <gnuradio/block.h>
+#include <gnuradio/host_buffer.h>
+
+namespace gr {
+
+buffer_type host_buffer::type(buftype_HOST_BUFFER{});
+
+void* host_buffer::device_memcpy(void* dest, const void* src, std::size_t count)
+{
+    // There is no spoon...er... device so fake it out using regular memcpy
+    return std::memcpy(dest, src, count);
+}
+
+void* host_buffer::device_memmove(void* dest, const void* src, std::size_t count)
+{
+    // There is no spoon...er... device so fake it out using regular memmmove
+    return std::memmove(dest, src, count);
+}
+
+
+host_buffer::host_buffer(int nitems,
+                         size_t sizeof_item,
+                         uint64_t downstream_lcm_nitems,
+                         uint32_t downstream_max_out_mult,
+                         block_sptr link,
+                         block_sptr buf_owner)
+    : buffer_single_mapped(nitems,
+                           sizeof_item,
+                           downstream_lcm_nitems,
+                           downstream_max_out_mult,
+                           link,
+                           buf_owner),
+      d_device_base(nullptr)
+{
+    gr::configure_default_loggers(d_logger, d_debug_logger, "host_buffer");
+    if (!allocate_buffer(nitems))
+        throw std::bad_alloc();
+}
+
+host_buffer::~host_buffer() {}
+
+void host_buffer::post_work(int nitems)
+{
+#ifdef BUFFER_DEBUG
+    std::ostringstream msg;
+    msg << "[" << this << "] "
+        << "host_buffer [" << d_context << "] -- post_work: " << nitems;
+    GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+    if (nitems <= 0) {
+        return;
+    }
+
+    // NOTE: when this function is called the write pointer has not yet been
+    // advanced so it can be used directly as the source ptr
+    switch (d_context) {
+    case buffer_context::HOST_TO_DEVICE: {
+        // Copy data from host buffer to device buffer
+        void* dest_ptr = &d_device_base[d_write_index * d_sizeof_item];
+        device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item);
+    } break;
+
+    case buffer_context::DEVICE_TO_HOST: {
+        // Copy data from device buffer to host buffer
+        void* dest_ptr = &d_base[d_write_index * d_sizeof_item];
+        device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item);
+    } break;
+
+    case buffer_context::DEVICE_TO_DEVICE:
+        // No op
+        break;
+
+    default:
+        std::ostringstream msg;
+        msg << "Unexpected context for host_buffer: " << d_context;
+        GR_LOG_ERROR(d_logger, msg.str());
+        throw std::runtime_error(msg.str());
+    }
+}
+
+bool host_buffer::do_allocate_buffer(size_t final_nitems, size_t sizeof_item)
+{
+#ifdef BUFFER_DEBUG
+    {
+        std::ostringstream msg;
+        msg << "[" << this << "] "
+            << "host_buffer constructor -- nitems: " << final_nitems;
+        GR_LOG_DEBUG(d_logger, msg.str());
+    }
+#endif
+
+    // This is the host buffer
+    d_buffer.reset(new char[final_nitems * sizeof_item]());
+    d_base = d_buffer.get();
+
+    // This is the simulated device buffer
+    d_device_buf.reset(new char[final_nitems * sizeof_item]());
+    d_device_base = d_device_buf.get();
+
+    return true;
+}
+
+void* host_buffer::write_pointer()
+{
+    void* ptr = nullptr;
+    switch (d_context) {
+    case buffer_context::HOST_TO_DEVICE:
+        // Write into host buffer
+        ptr = &d_base[d_write_index * d_sizeof_item];
+        break;
+
+    case buffer_context::DEVICE_TO_HOST:
+    case buffer_context::DEVICE_TO_DEVICE:
+        // Write into "device" buffer
+        ptr = &d_device_base[d_write_index * d_sizeof_item];
+        break;
+
+    default:
+        std::ostringstream msg;
+        msg << "Unexpected context for host_buffer: " << d_context;
+        GR_LOG_ERROR(d_logger, msg.str());
+        throw std::runtime_error(msg.str());
+    }
+
+    return ptr;
+}
+
+const void* host_buffer::_read_pointer(unsigned int read_index)
+{
+    void* ptr = nullptr;
+    switch (d_context) {
+    case buffer_context::HOST_TO_DEVICE:
+    case buffer_context::DEVICE_TO_DEVICE:
+        // Read from "device" buffer
+        ptr = &d_device_base[read_index * d_sizeof_item];
+        break;
+
+    case buffer_context::DEVICE_TO_HOST:
+        // Read from host buffer
+        ptr = &d_base[read_index * d_sizeof_item];
+        break;
+
+    default:
+        std::ostringstream msg;
+        msg << "Unexpected context for host_buffer: " << d_context;
+        GR_LOG_ERROR(d_logger, msg.str());
+        throw std::runtime_error(msg.str());
+    }
+
+    return ptr;
+}
+
+bool host_buffer::input_blocked_callback(int items_required,
+                                         int items_avail,
+                                         unsigned read_index)
+{
+#ifdef BUFFER_DEBUG
+    std::ostringstream msg;
+    msg << "[" << this << "] "
+        << "host_buffer [" << d_context << "] -- input_blocked_callback";
+    GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+    bool rc = false;
+    switch (d_context) {
+    case buffer_context::HOST_TO_DEVICE:
+    case buffer_context::DEVICE_TO_DEVICE:
+        // Adjust "device" buffer
+        rc = input_blocked_callback_logic(items_required,
+                                          items_avail,
+                                          read_index,
+                                          d_device_base,
+                                          host_buffer::device_memcpy,
+                                          host_buffer::device_memmove);
+        break;
+
+    case buffer_context::DEVICE_TO_HOST:
+    case buffer_context::HOST_TO_HOST:
+        // Adjust host buffer
+        rc = input_blocked_callback_logic(
+            items_required, items_avail, read_index, d_base, std::memcpy, std::memmove);
+        break;
+
+    default:
+        std::ostringstream msg;
+        msg << "Unexpected context for host_buffer: " << d_context;
+        GR_LOG_ERROR(d_logger, msg.str());
+        throw std::runtime_error(msg.str());
+    }
+
+    return rc;
+}
+
+bool host_buffer::output_blocked_callback(int output_multiple, bool force)
+{
+#ifdef BUFFER_DEBUG
+    std::ostringstream msg;
+    msg << "[" << this << "] "
+        << "host_buffer [" << d_context << "] -- output_blocked_callback";
+    GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+    bool rc = false;
+    switch (d_context) {
+    case buffer_context::HOST_TO_DEVICE:
+    case buffer_context::HOST_TO_HOST:
+        // Adjust host buffer
+        rc = output_blocked_callback_logic(output_multiple, force, d_base, std::memmove);
+        break;
+
+    case buffer_context::DEVICE_TO_HOST:
+    case buffer_context::DEVICE_TO_DEVICE:
+        // Adjust "device" buffer
+        rc = output_blocked_callback_logic(
+            output_multiple, force, d_device_base, host_buffer::device_memmove);
+        break;
+
+    default:
+        std::ostringstream msg;
+        msg << "Unexpected context for host_buffer: " << d_context;
+        GR_LOG_ERROR(d_logger, msg.str());
+        throw std::runtime_error(msg.str());
+    }
+
+    return rc;
+}
+
+buffer_sptr host_buffer::make_host_buffer(int nitems,
+                                          size_t sizeof_item,
+                                          uint64_t downstream_lcm_nitems,
+                                          uint32_t downstream_max_out_mult,
+                                          block_sptr link,
+                                          block_sptr buf_owner)
+{
+    return buffer_sptr(new host_buffer(nitems,
+                                       sizeof_item,
+                                       downstream_lcm_nitems,
+                                       downstream_max_out_mult,
+                                       link,
+                                       buf_owner));
+}
+
+} /* namespace gr */
diff --git a/gnuradio-runtime/lib/io_signature.cc b/gnuradio-runtime/lib/io_signature.cc
index 507268d5ce..e953c7eb5a 100644
--- a/gnuradio-runtime/lib/io_signature.cc
+++ b/gnuradio-runtime/lib/io_signature.cc
@@ -22,47 +22,69 @@ gr::io_signature::sptr io_signature::makev(int min_streams,
                                            int max_streams,
                                            const std::vector<int>& sizeof_stream_items)
 {
+    gr_vector_buffer_type buftypes(sizeof_stream_items.size(),
+                                   buffer_double_mapped::type);
     return gr::io_signature::sptr(
-        new io_signature(min_streams, max_streams, sizeof_stream_items));
+        new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes));
 }
 
-gr::io_signature::sptr
-io_signature::make(int min_streams, int max_streams, int sizeof_stream_item)
+gr::io_signature::sptr io_signature::makev(int min_streams,
+                                           int max_streams,
+                                           const std::vector<int>& sizeof_stream_items,
+                                           gr_vector_buffer_type buftypes)
 {
-    std::vector<int> sizeof_items(1);
-    sizeof_items[0] = sizeof_stream_item;
-    return io_signature::makev(min_streams, max_streams, sizeof_items);
+    return gr::io_signature::sptr(
+        new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes));
+}
+
+gr::io_signature::sptr io_signature::make(int min_streams,
+                                          int max_streams,
+                                          int sizeof_stream_item,
+                                          buffer_type buftype)
+{
+    std::vector<int> sizeof_items{ sizeof_stream_item };
+    gr_vector_buffer_type buftypes{ buftype };
+    return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
 }
 
 gr::io_signature::sptr io_signature::make2(int min_streams,
                                            int max_streams,
                                            int sizeof_stream_item1,
-                                           int sizeof_stream_item2)
+                                           int sizeof_stream_item2,
+                                           buffer_type buftype1,
+                                           buffer_type buftype2)
 {
-    std::vector<int> sizeof_items(2);
-    sizeof_items[0] = sizeof_stream_item1;
-    sizeof_items[1] = sizeof_stream_item2;
-    return io_signature::makev(min_streams, max_streams, sizeof_items);
+    std::vector<int> sizeof_items{ sizeof_stream_item1, sizeof_stream_item2 };
+    gr_vector_buffer_type buftypes{ buftype1, buftype2 };
+    return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
 }
 
 gr::io_signature::sptr io_signature::make3(int min_streams,
                                            int max_streams,
                                            int sizeof_stream_item1,
                                            int sizeof_stream_item2,
-                                           int sizeof_stream_item3)
+                                           int sizeof_stream_item3,
+                                           buffer_type buftype1,
+                                           buffer_type buftype2,
+                                           buffer_type buftype3)
 {
-    std::vector<int> sizeof_items(3);
-    sizeof_items[0] = sizeof_stream_item1;
-    sizeof_items[1] = sizeof_stream_item2;
-    sizeof_items[2] = sizeof_stream_item3;
-    return io_signature::makev(min_streams, max_streams, sizeof_items);
+    std::vector<int> sizeof_items{ sizeof_stream_item1,
+                                   sizeof_stream_item2,
+                                   sizeof_stream_item3 };
+    gr_vector_buffer_type buftypes{ buftype1, buftype2, buftype3 };
+    return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
 }
 
 // ------------------------------------------------------------------------
 
 io_signature::io_signature(int min_streams,
                            int max_streams,
-                           const std::vector<int>& sizeof_stream_items)
+                           const std::vector<int>& sizeof_stream_items,
+                           gr_vector_buffer_type buftypes)
+    : d_min_streams(min_streams),
+      d_max_streams(max_streams),
+      d_sizeof_stream_item(sizeof_stream_items),
+      d_stream_buffer_type(buftypes)
 {
     if (min_streams < 0 || (max_streams != IO_INFINITE && max_streams < min_streams))
         throw std::invalid_argument("gr::io_signature(1)");
@@ -75,10 +97,6 @@ io_signature::io_signature(int min_streams,
         if (max_streams != 0 && sizeof_stream_items[i] < 1)
             throw std::invalid_argument("gr::io_signature(3)");
     }
-
-    d_min_streams = min_streams;
-    d_max_streams = max_streams;
-    d_sizeof_stream_item = sizeof_stream_items;
 }
 
 io_signature::~io_signature() {}
@@ -97,4 +115,14 @@ std::vector<int> io_signature::sizeof_stream_items() const
     return d_sizeof_stream_item;
 }
 
+buffer_type io_signature::stream_buffer_type(size_t index) const
+{
+    return d_stream_buffer_type[std::min(index, d_stream_buffer_type.size() - 1)];
+}
+
+gr_vector_buffer_type io_signature::stream_buffer_types() const
+{
+    return d_stream_buffer_type;
+}
+
 } /* namespace gr */
diff --git a/gnuradio-runtime/lib/qa_buffer.cc b/gnuradio-runtime/lib/qa_buffer.cc
index cefe548338..1c97fd070d 100644
--- a/gnuradio-runtime/lib/qa_buffer.cc
+++ b/gnuradio-runtime/lib/qa_buffer.cc
@@ -43,7 +43,7 @@ static void t0_body()
     int counter = 0;
 
     gr::buffer_sptr buf(
-        gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+        gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
 
     int last_sa;
     int sa;
@@ -78,7 +78,7 @@ static void t1_body()
     int read_counter = 0;
 
     gr::buffer_sptr buf(
-        gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+        gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
     gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr()));
 
     int sa;
@@ -150,7 +150,7 @@ static void t2_body()
     int nitems = (64 * (1L << 10)) / sizeof(int); // 64K worth of ints
 
     gr::buffer_sptr buf(
-        gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+        gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
     gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr()));
 
     int read_counter = 0;
@@ -216,7 +216,7 @@ static void t3_body()
 
     static const int N = 5;
     gr::buffer_sptr buf(
-        gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+        gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
     gr::buffer_reader_sptr reader[N];
     int read_counter[N];
     int write_counter = 0;
diff --git a/gnuradio-runtime/lib/qa_host_buffer.cc b/gnuradio-runtime/lib/qa_host_buffer.cc
new file mode 100644
index 0000000000..e8f29afe8d
--- /dev/null
+++ b/gnuradio-runtime/lib/qa_host_buffer.cc
@@ -0,0 +1,334 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gnuradio/block.h>
+#include <gnuradio/buffer_context.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/host_buffer.h>
+#include <gnuradio/random.h>
+#include <boost/test/unit_test.hpp>
+#include <cstdlib>
+#include <iostream>
+
+// This is a trivial mocked up block inspired by gr::blocks::nop that is used
+// only as a placeholder for testing host_buffer below.
+class nop : public gr::block
+{
+public:
+    typedef std::shared_ptr<nop> sptr;
+    static sptr make(size_t sizeof_stream_item)
+    {
+        return gnuradio::make_block_sptr<nop>(sizeof_stream_item);
+    }
+
+    nop(size_t sizeof_stream_item)
+        : block("nop",
+                gr::io_signature::make(0, -1, sizeof_stream_item),
+                gr::io_signature::make(0, -1, sizeof_stream_item))
+    {
+    }
+
+    ~nop() override {}
+
+    int general_work(int noutput_items,
+                     gr_vector_int& ninput_items,
+                     gr_vector_const_void_star& input_items,
+                     gr_vector_void_star& output_items) override
+    {
+        // eat any input that's available
+        for (unsigned i = 0; i < ninput_items.size(); i++)
+            consume(i, ninput_items[i]);
+
+        return noutput_items;
+    }
+};
+
+
+// ----------------------------------------------------------------------------
+// Basic checks for buffer_single_mapped using the host_buffer implementation
+// of the interface for testing.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t0)
+{
+    int nitems = 65536 / sizeof(int);
+    gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+    gr::buffer_sptr buf(
+        gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+    buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+    gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+    BOOST_CHECK(buf->space_available() == nitems);
+    BOOST_CHECK(rdr1->items_available() == 0);
+
+    for (int idx = 1; idx <= 16; ++idx) {
+        buf->update_write_pointer(1000);
+        BOOST_CHECK(buf->space_available() == (nitems - (idx * 1000)));
+
+        BOOST_CHECK(rdr1->items_available() == (idx * 1000));
+    }
+
+    BOOST_CHECK(buf->space_available() == 384);
+
+    buf->update_write_pointer(buf->space_available());
+    BOOST_CHECK(buf->space_available() == 0);
+    BOOST_CHECK(rdr1->items_available() == nitems);
+    BOOST_CHECK(buf->space_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic checks for buffer_single_mapped using the host_buffer implementation
+// of the interface for testing.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t1)
+{
+    int nitems = 65536 / sizeof(int);
+    gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+    gr::buffer_sptr buf(
+        gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+    buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+    gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+    int space = buf->space_available();
+    BOOST_CHECK(nitems == space);
+
+    BOOST_CHECK(rdr1->items_available() == 0);
+
+    buf->update_write_pointer(nitems);
+    BOOST_CHECK(buf->space_available() == 0);
+    BOOST_CHECK(rdr1->items_available() == nitems);
+
+    for (int idx = 1; idx <= 16; ++idx) {
+        rdr1->update_read_pointer(1000);
+        BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+        space = buf->space_available();
+        BOOST_CHECK(space == (idx * 1000));
+    }
+
+    BOOST_CHECK(rdr1->items_available() == 384);
+    rdr1->update_read_pointer(384);
+    BOOST_CHECK(rdr1->items_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic check reader/write wrapping of buffer_single_mapped with 1 reader.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t2)
+{
+    int nitems = 65536 / sizeof(int);
+    gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+    gr::buffer_sptr buf(
+        gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+    buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+    gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+    int space = buf->space_available();
+    BOOST_CHECK(nitems == space);
+    BOOST_CHECK(rdr1->items_available() == 0);
+
+    buf->update_write_pointer(nitems);
+    BOOST_CHECK(buf->space_available() == 0);
+
+    for (int idx = 1; idx <= 16; ++idx) {
+        rdr1->update_read_pointer(1000);
+        BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+        space = buf->space_available();
+
+        if (idx <= 9)
+            BOOST_CHECK(space == (idx * 1000));
+        else
+            BOOST_CHECK(space == ((idx * 1000) - (nitems / 2)));
+
+        if (idx == 9) {
+            buf->update_write_pointer(nitems / 2);
+        }
+    }
+
+    // At this point we can only read up until the end of the buffer even though
+    // additional data is available at the beginning of the buffer
+    BOOST_CHECK(rdr1->items_available() == 384);
+    rdr1->update_read_pointer(384);
+
+    // Now the (nitems / 2) at the beginning of the buffer should be available
+    BOOST_CHECK(rdr1->items_available() == (nitems / 2));
+
+    for (int idx = 0; idx < 4; ++idx)
+        rdr1->update_read_pointer(1024);
+
+    BOOST_CHECK(buf->space_available() == (nitems / 2));
+    BOOST_CHECK(rdr1->items_available() == (nitems / 4));
+
+    for (int idx = 0; idx < 4; ++idx)
+        rdr1->update_read_pointer(1000);
+
+    BOOST_CHECK(buf->space_available() == (nitems / 2));
+    BOOST_CHECK(rdr1->items_available() == 96);
+
+    rdr1->update_read_pointer(96);
+    BOOST_CHECK(rdr1->items_available() == 0);
+
+    BOOST_CHECK(buf->space_available() == (nitems / 2));
+}
+
+// ----------------------------------------------------------------------------
+// Basic check reader/write wrapping of buffer_single_mapped with 2 readers.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t3)
+{
+    int nitems = 65536 / sizeof(int);
+    gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+    gr::buffer_sptr buf(
+        gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+    buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+    gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+    gr::buffer_reader_sptr rdr2(gr::buffer_add_reader(buf, 0, nop));
+
+    int space = buf->space_available();
+    BOOST_CHECK(nitems == space);
+    BOOST_CHECK(rdr1->items_available() == 0);
+    BOOST_CHECK(rdr2->items_available() == 0);
+
+    buf->update_write_pointer(nitems);
+    BOOST_CHECK(buf->space_available() == 0);
+    BOOST_CHECK(rdr1->items_available() == nitems);
+    BOOST_CHECK(rdr2->items_available() == nitems);
+
+    for (int idx = 1; idx <= 16; ++idx) {
+        rdr1->update_read_pointer(1000);
+        BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+        // Reader 2 hasn't read anything so space available should remain 0
+        BOOST_CHECK(buf->space_available() == 0);
+    }
+
+    int last_rdr1_available = rdr1->items_available();
+    int increment = last_rdr1_available / 4;
+
+    for (int idx = 1; idx <= 16; ++idx) {
+        rdr2->update_read_pointer(1000);
+        BOOST_CHECK(rdr2->items_available() == (nitems - (idx * 1000)));
+
+        BOOST_CHECK(rdr1->items_available() == last_rdr1_available);
+        if (idx % 4 == 0) {
+            rdr1->update_read_pointer(increment);
+            BOOST_CHECK(rdr1->items_available() == (last_rdr1_available - increment));
+            last_rdr1_available = rdr1->items_available();
+        }
+
+        BOOST_CHECK(buf->space_available() == (idx * 1000));
+    }
+}
+
+// ----------------------------------------------------------------------------
+// Basic check of output blocked callback
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t4)
+{
+    int nitems = 65536 / sizeof(int);
+    gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+    gr::buffer_sptr buf(
+        gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+    buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+    gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+    BOOST_CHECK(nitems == buf->space_available());
+    BOOST_CHECK(rdr1->items_available() == 0);
+
+    buf->update_write_pointer(nitems / 2);
+    BOOST_CHECK(buf->space_available() == (nitems / 2));
+    BOOST_CHECK(rdr1->items_available() == (nitems / 2));
+
+    rdr1->update_read_pointer(nitems / 2);
+    BOOST_CHECK(buf->space_available() == (nitems / 2));
+    BOOST_CHECK(rdr1->items_available() == 0);
+
+    buf->update_write_pointer(8000);
+    BOOST_CHECK(buf->space_available() == 192);
+
+    bool ready = buf->output_blkd_cb_ready(200);
+    BOOST_CHECK(ready == true);
+
+    bool success = buf->output_blocked_callback(200);
+    BOOST_CHECK(success == true);
+    BOOST_CHECK(buf->space_available() == 8384);
+    BOOST_CHECK(rdr1->items_available() == 8000);
+
+    rdr1->update_read_pointer(4000);
+    BOOST_CHECK(buf->space_available() == 8384);
+    BOOST_CHECK(rdr1->items_available() == 4000);
+
+    buf->update_write_pointer(4000);
+    BOOST_CHECK(buf->space_available() == 4384);
+    BOOST_CHECK(rdr1->items_available() == 8000);
+
+    rdr1->update_read_pointer(8000);
+    BOOST_CHECK(buf->space_available() == 4384);
+    BOOST_CHECK(rdr1->items_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic check of input blocked callback
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t5)
+{
+    int nitems = 65536 / sizeof(int);
+    gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+    gr::buffer_sptr buf(
+        gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+    buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+    gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+    BOOST_CHECK(nitems == buf->space_available());
+    BOOST_CHECK(rdr1->items_available() == 0);
+
+    buf->update_write_pointer(16000);
+    BOOST_CHECK(buf->space_available() == 384);
+    BOOST_CHECK(rdr1->items_available() == 16000);
+
+    rdr1->update_read_pointer(16000);
+    BOOST_CHECK(buf->space_available() == 384);
+    BOOST_CHECK(rdr1->items_available() == 0);
+
+    buf->update_write_pointer(384);
+    BOOST_CHECK(buf->space_available() == 16000);
+    BOOST_CHECK(rdr1->items_available() == 384);
+
+    buf->update_write_pointer(116);
+    BOOST_CHECK(buf->space_available() == 15884);
+    BOOST_CHECK(rdr1->items_available() == 384);
+
+    bool ready = rdr1->input_blkd_cb_ready(400);
+    BOOST_CHECK(ready == true);
+
+    bool success = rdr1->input_blocked_callback(400, rdr1->items_available());
+    BOOST_CHECK(success == true);
+    BOOST_CHECK(rdr1->items_available() == 500);
+
+    rdr1->update_read_pointer(500);
+    BOOST_CHECK(buf->space_available() == 15884);
+    BOOST_CHECK(rdr1->items_available() == 0);
+}
-- 
cgit v1.2.3