summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib
diff options
context:
space:
mode:
authorDavid Sorber <david.sorber@blacklynx.tech>2021-07-29 11:34:37 -0400
committermormj <34754695+mormj@users.noreply.github.com>2021-10-25 11:27:01 -0400
commitf3c558d88bc68d865f823c31e7d9aa78b3feab59 (patch)
tree026d8ff5568bc2c5ab731df29d87901cf1177e00 /gnuradio-runtime/lib
parent788827ae116bef871e144abd39b1e4482208eabe (diff)
runtime: Custom Buffer/Accelerator Device Support - Milestone 2
Completion of custom buffer/accelerator device support changes: * Improved custom buffer interface by removing awkward memory allocation functions from the block class * Increased flexibility for creating custom buffers by allowing creation of buffer_single_mapped subclasses * Fully incorporated data movement abstraction into the custom buffer interface and the runtime itself; accelerated blocks are no longer directly responsible for their own data movement * Zero copy back-to-back accelerated blocks are now supported (data no longer needs to be moved back to the host between each block) Signed-off-by: David Sorber <david.sorber@blacklynx.tech> Signed-off-by: Mike Mason <mike.mason@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib')
-rw-r--r--gnuradio-runtime/lib/CMakeLists.txt4
-rw-r--r--gnuradio-runtime/lib/block.cc63
-rw-r--r--gnuradio-runtime/lib/block_detail.cc2
-rw-r--r--gnuradio-runtime/lib/block_executor.cc22
-rw-r--r--gnuradio-runtime/lib/buffer.cc71
-rw-r--r--gnuradio-runtime/lib/buffer_context.cc32
-rw-r--r--gnuradio-runtime/lib/buffer_double_mapped.cc28
-rw-r--r--gnuradio-runtime/lib/buffer_reader.cc26
-rw-r--r--gnuradio-runtime/lib/buffer_reader_sm.cc85
-rw-r--r--gnuradio-runtime/lib/buffer_single_mapped.cc337
-rw-r--r--gnuradio-runtime/lib/buffer_type.cc18
-rw-r--r--gnuradio-runtime/lib/flat_flowgraph.cc67
-rw-r--r--gnuradio-runtime/lib/host_buffer.cc255
-rw-r--r--gnuradio-runtime/lib/io_signature.cc72
-rw-r--r--gnuradio-runtime/lib/qa_buffer.cc8
-rw-r--r--gnuradio-runtime/lib/qa_host_buffer.cc334
16 files changed, 1089 insertions, 335 deletions
diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt
index ca96549386..6fcd31bac4 100644
--- a/gnuradio-runtime/lib/CMakeLists.txt
+++ b/gnuradio-runtime/lib/CMakeLists.txt
@@ -55,16 +55,17 @@ add_library(gnuradio-runtime
block_gateway_impl.cc
block_registry.cc
buffer.cc
+ buffer_context.cc
buffer_double_mapped.cc
buffer_reader.cc
buffer_reader_sm.cc
buffer_single_mapped.cc
- buffer_type.cc
flat_flowgraph.cc
flowgraph.cc
hier_block2.cc
hier_block2_detail.cc
high_res_timer.cc
+ host_buffer.cc
io_signature.cc
local_sighandler.cc
logger.cc
@@ -335,6 +336,7 @@ if(ENABLE_TESTING)
qa_buffer.cc
qa_io_signature.cc
qa_logger.cc
+ qa_host_buffer.cc
qa_vmcircbuf.cc
)
list(APPEND GR_TEST_TARGET_DEPS gnuradio-runtime gnuradio-pmt)
diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc
index bb6ce95298..75f9dc6ae4 100644
--- a/gnuradio-runtime/lib/block.cc
+++ b/gnuradio-runtime/lib/block.cc
@@ -384,7 +384,8 @@ void block::set_min_output_buffer(int port, long min_output_buffer)
void block::allocate_detail(int ninputs,
int noutputs,
const std::vector<int>& downstream_max_nitems_vec,
- const std::vector<uint64_t>& downstream_lcm_nitems_vec)
+ const std::vector<uint64_t>& downstream_lcm_nitems_vec,
+ const std::vector<uint32_t>& downstream_max_out_mult_vec)
{
block_detail_sptr detail = make_block_detail(ninputs, noutputs);
@@ -393,8 +394,10 @@ void block::allocate_detail(int ninputs,
for (int i = 0; i < noutputs; i++) {
expand_minmax_buffer(i);
- buffer_sptr buffer = allocate_buffer(
- i, downstream_max_nitems_vec[i], downstream_lcm_nitems_vec[i]);
+ buffer_sptr buffer = allocate_buffer(i,
+ downstream_max_nitems_vec[i],
+ downstream_lcm_nitems_vec[i],
+ downstream_max_out_mult_vec[i]);
GR_LOG_DEBUG(d_debug_logger,
"Allocated buffer for output " + identifier() + " " +
std::to_string(i));
@@ -413,19 +416,24 @@ void block::allocate_detail(int ninputs,
set_detail(detail);
}
-buffer_sptr block::replace_buffer(uint32_t out_port, block_sptr block_owner)
+buffer_sptr
+block::replace_buffer(uint32_t src_port, uint32_t dst_port, block_sptr block_owner)
{
block_detail_sptr detail_ = detail();
- buffer_sptr orig_buffer = detail_->output(out_port);
+ buffer_sptr orig_buffer = detail_->output(src_port);
- // Make a new buffer but this time use the passed in block as the owner
- buffer_sptr new_buffer = make_buffer(orig_buffer->bufsize(),
- orig_buffer->get_sizeof_item(),
- orig_buffer->get_downstream_lcm_nitems(),
- shared_from_base<block>(),
- block_owner);
+ buffer_type buftype = block_owner->output_signature()->stream_buffer_type(dst_port);
- detail_->set_output(out_port, new_buffer);
+ // Make a new buffer but this time use the passed in block as the owner
+ buffer_sptr new_buffer =
+ buftype.make_buffer(orig_buffer->bufsize(),
+ orig_buffer->get_sizeof_item(),
+ orig_buffer->get_downstream_lcm_nitems(),
+ orig_buffer->get_max_reader_output_multiple(),
+ shared_from_base<block>(),
+ block_owner);
+
+ detail_->set_output(src_port, new_buffer);
return new_buffer;
}
@@ -435,7 +443,8 @@ void block::enable_update_rate(bool en) { d_update_rate = en; }
buffer_sptr block::allocate_buffer(int port,
int downstream_max_nitems,
- uint64_t downstream_lcm_nitems)
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult)
{
int item_size = output_signature()->sizeof_stream_item(port);
@@ -473,14 +482,16 @@ buffer_sptr block::allocate_buffer(int port,
buffer_sptr buf;
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
GR_LOG_DEBUG(d_logger,
"Block: " + name() + " allocated buffer for output " + identifier());
#endif
+ // Grab the buffer type associated with the output port and use it to
+ // create the specified type of buffer
+ buffer_type buftype = output_signature()->stream_buffer_type(port);
+
try {
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "downstream_max_nitems: " << downstream_max_nitems
<< " -- downstream_lcm_nitems: " << downstream_lcm_nitems
@@ -498,18 +509,20 @@ buffer_sptr block::allocate_buffer(int port,
}
GR_LOG_DEBUG(d_logger, msg.str());
#endif
- buf = make_buffer(nitems,
- item_size,
- downstream_lcm_nitems,
- shared_from_base<block>(),
- shared_from_base<block>());
+ buf = buftype.make_buffer(nitems,
+ item_size,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ shared_from_base<block>(),
+ shared_from_base<block>());
} catch (std::bad_alloc&) {
- buf = make_buffer(nitems,
- item_size,
- downstream_lcm_nitems,
- shared_from_base<block>(),
- shared_from_base<block>());
+ buf = buftype.make_buffer(nitems,
+ item_size,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ shared_from_base<block>(),
+ shared_from_base<block>());
}
// Set the max noutput items size here to make sure it's always
diff --git a/gnuradio-runtime/lib/block_detail.cc b/gnuradio-runtime/lib/block_detail.cc
index f5283c56b0..66b32ec078 100644
--- a/gnuradio-runtime/lib/block_detail.cc
+++ b/gnuradio-runtime/lib/block_detail.cc
@@ -114,6 +114,7 @@ void block_detail::consume_each(int how_many_items)
void block_detail::produce(int which_output, int how_many_items)
{
if (how_many_items > 0) {
+ d_output[which_output]->post_work(how_many_items);
d_output[which_output]->update_write_pointer(how_many_items);
d_produce_or |= how_many_items;
}
@@ -123,6 +124,7 @@ void block_detail::produce_each(int how_many_items)
{
if (how_many_items > 0) {
for (int i = 0; i < noutputs(); i++) {
+ d_output[i]->post_work(how_many_items);
d_output[i]->update_write_pointer(how_many_items);
}
d_produce_or |= how_many_items;
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc
index 6fbf2c5c17..ea59196571 100644
--- a/gnuradio-runtime/lib/block_executor.cc
+++ b/gnuradio-runtime/lib/block_executor.cc
@@ -59,9 +59,11 @@ static int min_available_space(block* m,
int min_noutput_items,
int& output_idx)
{
+#if ENABLE_LOGGING
gr::logger_ptr logger;
gr::logger_ptr debug_logger;
gr::configure_default_loggers(logger, debug_logger, "min_available_space");
+#endif
int min_space = std::numeric_limits<int>::max();
if (min_noutput_items == 0)
@@ -285,7 +287,7 @@ block_executor::state block_executor::run_one_iteration()
// determine the minimum available output space
output_idx = 0;
- out_try_again:
+ blkd_out_try_again:
noutput_items = min_available_space(
m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
noutput_items = std::min(noutput_items, max_noutput_items);
@@ -312,7 +314,7 @@ block_executor::state block_executor::run_one_iteration()
msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx
<< ")";
GR_LOG_INFO(d_debug_logger, msg.str()););
- goto out_try_again;
+ goto blkd_out_try_again;
}
} else {
return BLKD_OUT;
@@ -329,7 +331,6 @@ block_executor::state block_executor::run_one_iteration()
d_input_done.resize(d->ninputs());
d_output_items.resize(0);
d_start_nitems_read.resize(d->ninputs());
- // LOG(GR_LOG_INFO(d_debug_logger, "sink"););
LOG(std::ostringstream msg; msg << m << " -- sink";
GR_LOG_INFO(d_debug_logger, msg.str()););
@@ -367,7 +368,6 @@ block_executor::state block_executor::run_one_iteration()
GR_LOG_INFO(d_debug_logger, msg.str()););
if (noutput_items == 0) { // we're blocked on input
- // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
LOG(std::ostringstream msg; msg << m << " -- BLKD_IN";
GR_LOG_INFO(d_debug_logger, msg.str()));
return BLKD_IN;
@@ -400,7 +400,7 @@ block_executor::state block_executor::run_one_iteration()
// determine the minimum available output space
output_idx = 0;
- out_try_again2:
+ blkd_out_try_again2:
noutput_items = min_available_space(
m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
if (ENABLE_LOGGING) {
@@ -415,7 +415,6 @@ block_executor::state block_executor::run_one_iteration()
goto were_done;
if (noutput_items == 0) { // we're output blocked
- // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT";
GR_LOG_INFO(d_debug_logger, msg.str()));
@@ -435,7 +434,7 @@ block_executor::state block_executor::run_one_iteration()
msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx
<< ")";
GR_LOG_INFO(d_debug_logger, msg.str()););
- goto out_try_again2;
+ goto blkd_out_try_again2;
}
} else {
return BLKD_OUT;
@@ -532,10 +531,6 @@ block_executor::state block_executor::run_one_iteration()
buffer_reader_sptr in_buf = d->input(i);
- LOG(std::ostringstream msg;
- msg << m << " (t: " << this << ") -- pre-callback";
- GR_LOG_DEBUG(d_debug_logger, msg.str()));
-
if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) {
gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer());
if (in_buf->input_blocked_callback(d_ninput_items_required[i],
@@ -681,8 +676,8 @@ block_executor::state block_executor::run_one_iteration()
GR_LOG_DEBUG(d_debug_logger, msg.str()););
gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
out_buf->output_blocked_callback(m->output_multiple(), true);
- LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i
- << "] -- OUTPUT BLOCKED CBACK: " << rc;
+ LOG(std::ostringstream msg;
+ msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED CBACK ";
GR_LOG_DEBUG(d_debug_logger, msg.str()););
}
@@ -692,7 +687,6 @@ block_executor::state block_executor::run_one_iteration()
GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine");
were_done:
- // LOG(GR_LOG_INFO(d_debug_logger, "we're done"););
LOG(std::ostringstream msg; msg << m << " -- we're done";
GR_LOG_INFO(d_debug_logger, msg.str()));
d->set_done(true);
diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc
index 7fd0a39579..75fc447e8d 100644
--- a/gnuradio-runtime/lib/buffer.cc
+++ b/gnuradio-runtime/lib/buffer.cc
@@ -12,6 +12,7 @@
#include "config.h"
#endif
#include "vmcircbuf.h"
+#include <gnuradio/block.h>
#include <gnuradio/buffer.h>
#include <gnuradio/buffer_double_mapped.h>
#include <gnuradio/buffer_reader.h>
@@ -56,10 +57,11 @@ static long s_buffer_count = 0; // counts for debugging storage mgmt
---------------------------------------------------------------------------- */
-buffer::buffer(BufferMappingType buf_type,
+buffer::buffer(buffer_mapping_type buf_type,
int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link)
: d_base(0),
d_bufsize(0),
@@ -76,7 +78,9 @@ buffer::buffer(BufferMappingType buf_type,
d_callback_flag(false),
d_active_pointer_counter(0),
d_downstream_lcm_nitems(downstream_lcm_nitems),
- d_write_multiple(0)
+ d_write_multiple(0),
+ d_max_reader_output_multiple(downstream_max_out_mult),
+ d_context(buffer_context::DEFAULT_INVALID)
{
gr::configure_default_loggers(d_logger, d_debug_logger, "buffer");
@@ -86,44 +90,27 @@ buffer::buffer(BufferMappingType buf_type,
buffer_sptr make_buffer(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link,
block_sptr buf_owner)
{
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
gr::logger_ptr logger;
gr::logger_ptr debug_logger;
gr::configure_default_loggers(logger, debug_logger, "make_buffer");
std::ostringstream msg;
#endif
-#if DEBUG_SINGLE_MAPPED
- if (1) {
-#else
- if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) {
-#endif
- // Buffer type is NOT the default non custom variety so allocate a
- // buffer_single_mapped instance
-#ifdef BUFFER_DEBUG
- msg << "buffer_single_mapped nitems: " << nitems
- << " -- sizeof_item: " << sizeof_item;
- GR_LOG_DEBUG(logger, msg.str());
-#endif
-
- return buffer_sptr(new buffer_single_mapped(
- nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner));
-
- } else {
- // Default to allocating a buffer_double_mapped instance
-#ifdef BUFFER_DEBUG
- msg << "buffer_double_mapped nitems: " << nitems
- << " -- sizeof_item: " << sizeof_item;
- GR_LOG_DEBUG(logger, msg.str());
-#endif
-
- return buffer_sptr(
- new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
- }
+ // NOTE: This function is no longer called by flat_flowgraph functions and
+ // therefore is somewhat deprecated. It will create and return a
+ // buffer_double_mapped subclass by default.
+ buffer_type buftype = buffer_double_mapped::type;
+ return buftype.make_buffer(nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ link,
+ buf_owner);
}
buffer::~buffer()
@@ -134,12 +121,16 @@ buffer::~buffer()
void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
+const void* buffer::_read_pointer(unsigned int read_index)
+{
+ return &d_base[read_index * d_sizeof_item];
+}
+
void buffer::update_write_pointer(int nitems)
{
gr::thread::scoped_lock guard(*mutex());
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
unsigned orig_wr_idx = d_write_index;
#endif
@@ -147,7 +138,6 @@ void buffer::update_write_pointer(int nitems)
d_abs_write_offset += nitems;
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx
<< " -- nitems: " << nitems << " -- d_write_index: " << d_write_index;
@@ -266,4 +256,21 @@ std::ostream& operator<<(std::ostream& os, const buffer& buf)
return os;
}
+void buffer::set_context(const buffer_context& context)
+{
+ if ((d_context == buffer_context::DEFAULT_INVALID) || (d_context == context)) {
+ // Set the context if the existing value is the default or if it is the
+ // same as what's already been set
+ d_context = context;
+ } else {
+ // Otherwise error out as the context value cannot be changed after
+ // it is set
+ std::ostringstream msg;
+ msg << "Block: " << link()->identifier() << " has context " << d_context
+ << " assigned. Cannot change to context " << context << ".";
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+}
+
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_context.cc b/gnuradio-runtime/lib/buffer_context.cc
new file mode 100644
index 0000000000..9cd27add36
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_context.cc
@@ -0,0 +1,32 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+#include <gnuradio/buffer_context.h>
+
+namespace gr {
+
+std::ostream& operator<<(std::ostream& os, const buffer_context& context)
+{
+ switch (context) {
+ case buffer_context::DEFAULT_INVALID:
+ return os << "DEFAULT_INVALID";
+ case buffer_context::HOST_TO_DEVICE:
+ return os << "HOST_TO_DEVICE";
+ case buffer_context::DEVICE_TO_HOST:
+ return os << "DEVICE_TO_HOST";
+ case buffer_context::HOST_TO_HOST:
+ return os << "HOST_TO_HOST";
+ case buffer_context::DEVICE_TO_DEVICE:
+ return os << "DEVICE_TO_DEVICE";
+ default:
+ return os << "Unknown buffer context: " << static_cast<int>(context);
+ }
+}
+
+} // namespace gr
diff --git a/gnuradio-runtime/lib/buffer_double_mapped.cc b/gnuradio-runtime/lib/buffer_double_mapped.cc
index ad99c2162b..700ebac0c1 100644
--- a/gnuradio-runtime/lib/buffer_double_mapped.cc
+++ b/gnuradio-runtime/lib/buffer_double_mapped.cc
@@ -36,23 +36,25 @@ static inline long minimum_buffer_items(long type_size, long page_size)
return page_size / GR_GCD(type_size, page_size);
}
+buffer_type buffer_double_mapped::type(buftype_DEFAULT_NON_CUSTOM{});
buffer_double_mapped::buffer_double_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link)
- : buffer(BufferMappingType::DoubleMapped,
+ : buffer(buffer_mapping_type::double_mapped,
nitems,
sizeof_item,
downstream_lcm_nitems,
+ downstream_max_out_mult,
link)
{
gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_double_mapped");
- if (!allocate_buffer(nitems, sizeof_item))
+ if (!allocate_buffer(nitems))
throw std::bad_alloc();
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
{
std::ostringstream msg;
msg << "[" << this << "] "
@@ -62,13 +64,18 @@ buffer_double_mapped::buffer_double_mapped(int nitems,
#endif
}
+// NB: Added the extra 'block_sptr unused' parameter so that the
+// call signature matches the other factory-like functions used to create
+// the buffer_single_mapped subclasses
buffer_sptr make_buffer_double_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
- block_sptr link)
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr unused)
{
- return buffer_sptr(
- new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
+ return buffer_sptr(new buffer_double_mapped(
+ nitems, sizeof_item, downstream_lcm_nitems, downstream_max_out_mult, link));
}
buffer_double_mapped::~buffer_double_mapped() {}
@@ -77,13 +84,13 @@ buffer_double_mapped::~buffer_double_mapped() {}
* sets d_vmcircbuf, d_base, d_bufsize.
* returns true iff successful.
*/
-bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
+bool buffer_double_mapped::allocate_buffer(int nitems)
{
int orig_nitems = nitems;
// Any buffer size we come up with must be a multiple of min_nitems.
int granularity = gr::vmcircbuf_sysconfig::granularity();
- int min_nitems = minimum_buffer_items(sizeof_item, granularity);
+ int min_nitems = minimum_buffer_items(d_sizeof_item, granularity);
// Round-up nitems to a multiple of min_nitems.
if (nitems % min_nitems != 0)
@@ -91,7 +98,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
// If we rounded-up a whole bunch, give the user a heads up.
// This only happens if sizeof_item is not a power of two.
- if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) {
+ if (nitems > 2 * orig_nitems && nitems * (int)d_sizeof_item > granularity) {
auto msg =
str(boost::format(
"allocate_buffer: tried to allocate"
@@ -99,7 +106,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
" %d were allocated. If this isn't OK, consider padding"
" your structure to a power-of-two bytes."
" On this platform, our allocation granularity is %d bytes.") %
- orig_nitems % sizeof_item % nitems % granularity);
+ orig_nitems % d_sizeof_item % nitems % granularity);
GR_LOG_WARN(d_logger, msg.c_str());
}
@@ -138,7 +145,6 @@ int buffer_double_mapped::space_available()
}
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << this << "] "
<< "space_available() called d_write_index: " << d_write_index
diff --git a/gnuradio-runtime/lib/buffer_reader.cc b/gnuradio-runtime/lib/buffer_reader.cc
index 7ead53032e..b1c1e7fb73 100644
--- a/gnuradio-runtime/lib/buffer_reader.cc
+++ b/gnuradio-runtime/lib/buffer_reader.cc
@@ -34,11 +34,11 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay
buffer_reader_sptr r;
- if (buf->get_mapping_type() == BufferMappingType::DoubleMapped) {
+ if (buf->get_mapping_type() == buffer_mapping_type::double_mapped) {
r.reset(new buffer_reader(
buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
r->declare_sample_delay(delay);
- } else if (buf->get_mapping_type() == BufferMappingType::SingleMapped) {
+ } else if (buf->get_mapping_type() == buffer_mapping_type::single_mapped) {
r.reset(new buffer_reader_sm(
buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
r->declare_sample_delay(delay);
@@ -51,11 +51,15 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay
buf->d_readers.push_back(r.get());
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
- std::cerr << " [" << buf.get() << ";" << r.get()
- << "] buffer_add_reader() nzero_preload " << nzero_preload
- << " -- delay: " << delay << " -- history: " << link->history()
- << " -- RD_idx: " << r->d_read_index << std::endl;
+ gr::logger_ptr logger;
+ gr::logger_ptr debug_logger;
+ gr::configure_default_loggers(logger, debug_logger, "buffer_add_reader");
+
+ std::ostringstream msg;
+ msg << " [" << buf.get() << ";" << r.get() << "] buffer_add_reader() nzero_preload "
+ << nzero_preload << " -- delay: " << delay << " -- history: " << link->history()
+ << " -- RD_idx: " << r->d_read_index;
+ GR_LOG_DEBUG(debug_logger, msg.str());
#endif
return r;
@@ -89,12 +93,11 @@ void buffer_reader::declare_sample_delay(unsigned delay)
unsigned buffer_reader::sample_delay() const { return d_attr_delay; }
-int buffer_reader::items_available() // const
+int buffer_reader::items_available() const
{
int available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << d_buffer << ";" << this << "] "
<< "items_available() WR_idx: " << d_buffer->d_write_index
@@ -109,7 +112,8 @@ int buffer_reader::items_available() // const
const void* buffer_reader::read_pointer()
{
- return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item];
+ // Delegate to buffer subclass
+ return d_buffer->_read_pointer(d_read_index);
}
void buffer_reader::update_read_pointer(int nitems)
@@ -117,7 +121,6 @@ void buffer_reader::update_read_pointer(int nitems)
gr::thread::scoped_lock guard(*mutex());
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
unsigned orig_rd_idx = d_read_index;
#endif
@@ -125,7 +128,6 @@ void buffer_reader::update_read_pointer(int nitems)
d_abs_read_offset += nitems;
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << d_buffer << ";" << this
<< "] update_read_pointer -- orig d_read_index: " << orig_rd_idx
diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc
index 9e0fac584f..c0d4b07d65 100644
--- a/gnuradio-runtime/lib/buffer_reader_sm.cc
+++ b/gnuradio-runtime/lib/buffer_reader_sm.cc
@@ -26,13 +26,14 @@ namespace gr {
buffer_reader_sm::~buffer_reader_sm() {}
-int buffer_reader_sm::items_available()
+int buffer_reader_sm::items_available() const
{
int available = 0;
if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
if (d_buffer->d_write_index == d_read_index) {
- if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+ if ((nitems_read() - sample_delay()) !=
+ (d_buffer->nitems_written() + link()->history() - 1)) {
available = d_buffer->d_bufsize - d_read_index;
}
} else {
@@ -55,87 +56,15 @@ int buffer_reader_sm::items_available()
bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const
{
- gr::thread::scoped_lock(*d_buffer->mutex());
-
- return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
- (d_buffer->d_write_index < d_read_index));
+ return d_buffer->input_blkd_cb_ready(items_required, d_read_index);
}
bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail)
{
- // Maybe adjust read pointers from min read index?
- // This would mean that *all* readers must be > (passed) the write index
- if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
- (d_buffer->d_write_index < d_read_index)) {
-
- // Update items available before going farther as it could be stale
- items_avail = items_available();
-
- // Find reader with the smallest read index that is greater than the
- // write index
- uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
- uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
- for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
- if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) {
- // Record index of reader with minimum read-index
- if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) {
- min_read_idx = d_buffer->d_readers[idx]->d_read_index;
- min_reader_index = idx;
- }
- }
- }
-
- // Note items_avail might be zero, that's okay.
- items_avail += d_read_index - min_read_idx;
- int gap = min_read_idx - d_buffer->d_write_index;
- if (items_avail > gap) {
- return false;
- }
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << d_buffer << ";" << this << "] "
- << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index
- << " -- WR items: " << d_buffer->nitems_written()
- << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx;
- for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
- if (idx != min_reader_index) {
- msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index;
- }
- }
-
- msg << " -- GAP: " << gap << " -- items_required: " << items_required
- << " -- items_avail: " << items_avail;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- // Shift existing data down to make room for blocked data at end of buffer
- uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item;
- char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item);
- std::memmove(dest, d_buffer->d_base, move_data_size);
-
- // Next copy the data from the end of the buffer back to the beginning
- uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item;
- char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item);
- std::memcpy(d_buffer->d_base, src, avail_data_size);
-
- // Now adjust write pointer
- d_buffer->d_write_index += items_avail;
-
- // Finally adjust all reader pointers
- for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
- if (idx == min_reader_index) {
- d_buffer->d_readers[idx]->d_read_index = 0;
- } else {
- d_buffer->d_readers[idx]->d_read_index += items_avail;
- d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize;
- }
- }
-
- return true;
- }
+ // Update items available before going farther as it could be stale
+ items_avail = items_available();
- return false;
+ return d_buffer->input_blocked_callback(items_required, items_avail, d_read_index);
}
buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer,
diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc
index 6024396557..db7959c5b4 100644
--- a/gnuradio-runtime/lib/buffer_single_mapped.cc
+++ b/gnuradio-runtime/lib/buffer_single_mapped.cc
@@ -19,6 +19,8 @@
#include <gnuradio/thread/thread.h>
#include <assert.h>
#include <algorithm>
+#include <cstdlib>
+#include <cstring>
#include <iostream>
#include <stdexcept>
@@ -27,30 +29,18 @@ namespace gr {
buffer_single_mapped::buffer_single_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link,
block_sptr buf_owner)
- : buffer(BufferMappingType::SingleMapped,
+ : buffer(buffer_mapping_type::single_mapped,
nitems,
sizeof_item,
downstream_lcm_nitems,
+ downstream_max_out_mult,
link),
d_buf_owner(buf_owner),
- d_buffer(nullptr,
- std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1))
+ d_buffer(nullptr)
{
- gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped");
- if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems))
- throw std::bad_alloc();
-
-#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
- {
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "buffer_single_mapped constructor -- history: " << link->history();
- GR_LOG_DEBUG(d_logger, msg.str());
- }
-#endif
}
buffer_single_mapped::~buffer_single_mapped() {}
@@ -59,14 +49,21 @@ buffer_single_mapped::~buffer_single_mapped() {}
* Allocates underlying buffer.
* returns true iff successful.
*/
-bool buffer_single_mapped::allocate_buffer(int nitems,
- size_t sizeof_item,
- uint64_t downstream_lcm_nitems)
+bool buffer_single_mapped::allocate_buffer(int nitems)
{
#ifdef BUFFER_DEBUG
int orig_nitems = nitems;
#endif
+ // For single mapped buffers resize the initial size to be at least four
+ // times the size of the largest of any downstream block's output multiple.
+ // This helps reduce the number of times the input_block_callback() might be
+ // called which should help overall performance (particularly if the max
+ // output multiple is large) at the cost of slightly more buffer space.
+ if (static_cast<uint32_t>(nitems) < (4 * d_max_reader_output_multiple)) {
+ nitems = 4 * d_max_reader_output_multiple;
+ }
+
// Unlike the double mapped buffer case that can easily wrap back onto itself
// for both reads and writes the single mapped case needs to be aware of read
// and write granularity and size the underlying buffer accordingly. Otherwise
@@ -105,33 +102,37 @@ bool buffer_single_mapped::allocate_buffer(int nitems,
#endif
// Adjust size so output buffer size is a multiple of the write granularity
- if (write_granularity != 1 || downstream_lcm_nitems != 1) {
- uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems);
+ if (write_granularity != 1 || d_downstream_lcm_nitems != 1) {
+ uint64_t size_align_adjust = GR_LCM(write_granularity, d_downstream_lcm_nitems);
uint64_t remainder = nitems % size_align_adjust;
nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0;
#ifdef BUFFER_DEBUG
std::ostringstream msg;
msg << "allocate_buffer()** called nitems: " << orig_nitems
- << " -- read_multiple: " << downstream_lcm_nitems
+ << " -- read_multiple: " << d_downstream_lcm_nitems
<< " -- write_multiple: " << write_granularity
<< " -- NEW nitems: " << nitems;
GR_LOG_DEBUG(d_logger, msg.str());
#endif
}
- // Allocate a new custom buffer from the owning block
- char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item);
- assert(buf != nullptr);
- d_buffer.reset(buf);
-
- d_base = d_buffer.get();
d_bufsize = nitems;
- d_downstream_lcm_nitems = downstream_lcm_nitems;
+ d_downstream_lcm_nitems = d_downstream_lcm_nitems;
d_write_multiple = write_granularity;
- return true;
+ // Do the actual allocation(s) with the finalized nitems
+ return do_allocate_buffer(nitems, d_sizeof_item);
+}
+
+bool buffer_single_mapped::input_blkd_cb_ready(int items_required,
+ unsigned int read_index)
+{
+ gr::thread::scoped_lock(*this->mutex());
+
+ return (((d_bufsize - read_index) < (uint32_t)items_required) &&
+ (d_write_index < read_index));
}
bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
@@ -145,70 +146,6 @@ bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
((space_avail / output_multiple) * output_multiple == 0));
}
-
-bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force)
-{
- uint32_t space_avail = space_available();
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "output_blocked_callback()*** WR_idx: " << d_write_index
- << " -- WR items: " << nitems_written()
- << " -- output_multiple: " << output_multiple
- << " -- space_avail: " << space_avail << " -- force: " << force;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
- force) {
- // Find reader with the smallest read index
- uint32_t min_read_idx = d_readers[0]->d_read_index;
- for (size_t idx = 1; idx < d_readers.size(); ++idx) {
- // Record index of reader with minimum read-index
- if (d_readers[idx]->d_read_index < min_read_idx) {
- min_read_idx = d_readers[idx]->d_read_index;
- }
- }
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "output_blocked_callback() WR_idx: " << d_write_index
- << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
- << " -- shortcircuit: "
- << ((min_read_idx == 0) || (min_read_idx >= d_write_index))
- << " -- to_move_items: " << (d_write_index - min_read_idx)
- << " -- space_avail: " << space_avail << " -- force: " << force;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- // Make sure we have enough room to start writing back at the beginning
- if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) {
- return false;
- }
-
- // Determine how much "to be read" data needs to be moved
- int to_move_items = d_write_index - min_read_idx;
- assert(to_move_items > 0);
- uint32_t to_move_bytes = to_move_items * d_sizeof_item;
-
- // Shift "to be read" data back to the beginning of the buffer
- std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes);
-
- // Adjust write index and each reader index
- d_write_index -= min_read_idx;
-
- for (size_t idx = 0; idx < d_readers.size(); ++idx) {
- d_readers[idx]->d_read_index -= min_read_idx;
- }
-
- return true;
- }
-
- return false;
-}
-
int buffer_single_mapped::space_available()
{
if (d_readers.empty())
@@ -233,7 +170,7 @@ int buffer_single_mapped::space_available()
// For single mapped buffer there is no wrapping beyond the end of the
// buffer
#ifdef BUFFER_DEBUG
- int thecase = 4; // REMOVE ME - just for debug
+ int thecase = 0;
#endif
int space = d_bufsize - d_write_index;
@@ -259,15 +196,17 @@ int buffer_single_mapped::space_available()
#endif
space = min_read_index - d_write_index;
// Leave extra space in case the reader gets stuck and needs realignment
- {
- if ((d_write_index > (d_bufsize / 2)) ||
- (min_read_index < (d_bufsize / 2))) {
+ if (d_max_reader_output_multiple > 1) {
+ if (static_cast<uint32_t>(space) > d_max_reader_output_multiple) {
#ifdef BUFFER_DEBUG
- thecase = 17;
+ thecase = 4;
#endif
- space = 0;
+ space = space - d_max_reader_output_multiple;
} else {
- space = (d_bufsize / 2) - d_write_index;
+#ifdef BUFFER_DEBUG
+ thecase = 5;
+#endif
+ space = 0;
}
}
}
@@ -278,7 +217,6 @@ int buffer_single_mapped::space_available()
}
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << this << "] "
<< "space_available() called (case: " << thecase
@@ -286,6 +224,7 @@ int buffer_single_mapped::space_available()
<< " -- min_read_index: " << min_read_index << " ("
<< min_idx_reader->nitems_read() << ") "
<< " -- space: " << space
+ << " -- max_reader_out_mult: " << d_max_reader_output_multiple
<< " (sample delay: " << min_idx_reader->sample_delay() << ")";
GR_LOG_DEBUG(d_logger, msg.str());
#endif
@@ -294,4 +233,198 @@ int buffer_single_mapped::space_available()
}
}
+void buffer_single_mapped::update_reader_block_history(unsigned history, int delay)
+{
+ unsigned old_max = d_max_reader_history;
+ d_max_reader_history = std::max(d_max_reader_history, history);
+ if (d_max_reader_history != old_max) {
+ d_write_index = d_max_reader_history - 1;
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "buffer_single_mapped constructor -- set wr index to: " << d_write_index;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Reset the reader's read index if the buffer's write index has changed.
+ // Note that "history - 1" is the nzero_preload value passed to
+ // buffer_add_reader.
+ for (auto reader : d_readers) {
+ reader->d_read_index = d_write_index - (reader->link()->history() - 1);
+ }
+ }
+
+ // Only attempt to set has history flag if it is not already set
+ if (!d_has_history) {
+ // Blocks that set delay may set history to delay + 1 but this is
+ // not "real" history
+ d_has_history = ((static_cast<int>(history) - 1) != delay);
+ }
+}
+
+//------------------------------------------------------------------------------
+
+bool buffer_single_mapped::input_blocked_callback_logic(int items_required,
+ int items_avail,
+ unsigned read_index,
+ char* buffer_ptr,
+ memcpy_func_t memcpy_func,
+ memmove_func_t memmove_func)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize
+ << " -- RD_idx: " << read_index << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Maybe adjust read pointers from min read index?
+ // This would mean that *all* readers must be > (passed) the write index
+ if (((d_bufsize - read_index) < (uint32_t)items_required) &&
+ (d_write_index < read_index)) {
+
+ // Find reader with the smallest read index that is greater than the
+ // write index
+ uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
+ uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (d_readers[idx]->d_read_index > d_write_index) {
+ // Record index of reader with minimum read-index
+ if (d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_readers[idx]->d_read_index;
+ min_reader_index = idx;
+ }
+ }
+ }
+
+ // Note items_avail might be zero, that's okay.
+ items_avail += read_index - min_read_idx;
+ int gap = min_read_idx - d_write_index;
+ if (items_avail > gap) {
+ return false;
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize
+ << " -- RD_idx: " << min_read_idx;
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (idx != min_reader_index) {
+ msg << " -- OTHER_RDR: " << d_readers[idx]->d_read_index;
+ }
+ }
+
+ msg << " -- GAP: " << gap << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Shift existing data down to make room for blocked data at end of buffer
+ uint32_t move_data_size = d_write_index * d_sizeof_item;
+ char* dest = buffer_ptr + (items_avail * d_sizeof_item);
+ memmove_func(dest, buffer_ptr, move_data_size);
+
+ // Next copy the data from the end of the buffer back to the beginning
+ uint32_t avail_data_size = items_avail * d_sizeof_item;
+ char* src = buffer_ptr + (min_read_idx * d_sizeof_item);
+ memcpy_func(buffer_ptr, src, avail_data_size);
+
+ // Now adjust write pointer
+ d_write_index += items_avail;
+
+ // Finally adjust all reader pointers
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (idx == min_reader_index) {
+ d_readers[idx]->d_read_index = 0;
+ } else {
+ d_readers[idx]->d_read_index += items_avail;
+ d_readers[idx]->d_read_index %= d_bufsize;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+bool buffer_single_mapped::output_blocked_callback_logic(int output_multiple,
+ bool force,
+ char* buffer_ptr,
+ memmove_func_t memmove_func)
+{
+ uint32_t space_avail = space_available();
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "output_blocked_callback()*** WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written()
+ << " -- output_multiple: " << output_multiple
+ << " -- space_avail: " << space_avail << " -- force: " << force;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
+ force) {
+ // Find reader with the smallest read index
+ uint32_t min_read_idx = d_readers[0]->d_read_index;
+ uint64_t min_read_idx_nitems = d_readers[0]->nitems_read();
+ for (size_t idx = 1; idx < d_readers.size(); ++idx) {
+ // Record index of reader with minimum read-index
+ if (d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_readers[idx]->d_read_index;
+ min_read_idx_nitems = d_readers[idx]->nitems_read();
+ }
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "output_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
+ << " -- RD items: " << min_read_idx_nitems << " -- shortcircuit: "
+ << ((min_read_idx == 0) || (min_read_idx > d_write_index) ||
+ (min_read_idx == d_write_index &&
+ min_read_idx_nitems != nitems_written()))
+ << " -- to_move_items: " << (d_write_index - min_read_idx)
+ << " -- space_avail: " << space_avail << " -- force: " << force;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Make sure we have enough room to start writing back at the beginning
+ if ((min_read_idx == 0) || (min_read_idx > d_write_index) ||
+ (min_read_idx == d_write_index && min_read_idx_nitems != nitems_written())) {
+ return false;
+ }
+
+ // Determine how much "to be read" data needs to be moved
+ int to_move_items = d_write_index - min_read_idx;
+ if (to_move_items > 0) {
+ uint32_t to_move_bytes = to_move_items * d_sizeof_item;
+
+ // Shift "to be read" data back to the beginning of the buffer
+ memmove_func(
+ buffer_ptr, buffer_ptr + (min_read_idx * d_sizeof_item), to_move_bytes);
+ }
+
+ // Adjust write index and each reader index
+ d_write_index -= min_read_idx;
+
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ d_readers[idx]->d_read_index -= min_read_idx;
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_type.cc b/gnuradio-runtime/lib/buffer_type.cc
deleted file mode 100644
index b667eded1e..0000000000
--- a/gnuradio-runtime/lib/buffer_type.cc
+++ /dev/null
@@ -1,18 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2020 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-#include <gnuradio/buffer_type.h>
-
-namespace gr {
-
-uint32_t buffer_type_base::s_nextId = 0;
-std::mutex buffer_type_base::s_mutex;
-
-
-} /* namespace gr */ \ No newline at end of file
diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc
index b9986f8370..ef96ef21ac 100644
--- a/gnuradio-runtime/lib/flat_flowgraph.cc
+++ b/gnuradio-runtime/lib/flat_flowgraph.cc
@@ -15,6 +15,7 @@
#include "flat_flowgraph.h"
#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_double_mapped.h>
#include <gnuradio/buffer_reader.h>
#include <gnuradio/buffer_type.h>
#include <gnuradio/integer_math.h>
@@ -87,6 +88,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
// Determine the downstream max per output port
std::vector<int> downstream_max_nitems(noutputs, 0);
std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1);
+ std::vector<uint32_t> downstream_max_out_mult(noutputs, 1);
#ifdef BUFFER_DEBUG
std::ostringstream msg;
@@ -96,6 +98,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
for (int i = 0; i < noutputs; i++) {
int nitems = 0;
uint64_t lcm_nitems = 1;
+ uint32_t max_out_multiple = 1;
basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i);
for (basic_block_viter_t blk = downstream_blocks.begin();
blk != downstream_blocks.end();
@@ -116,8 +119,8 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
double decimation = (1.0 / dgrblock->relative_rate());
int multiple = dgrblock->output_multiple();
int history = dgrblock->history();
- nitems =
- std::max(nitems, static_cast<int>(2 * (decimation * multiple + history)));
+ nitems = std::max(
+ nitems, static_cast<int>(2 * (decimation * multiple + (history - 1))));
// Calculate the LCM of downstream reader nitems
#ifdef BUFFER_DEBUG
@@ -131,6 +134,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
(uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) -
(dgrblock->history() - 1)));
}
+
if (dgrblock->relative_rate() != 1.0) {
// Relative rate
lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d());
@@ -141,6 +145,10 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
lcm_nitems = 1;
}
+ if (static_cast<uint32_t>(multiple) > max_out_multiple) {
+ max_out_multiple = multiple;
+ }
+
#ifdef BUFFER_DEBUG
msg.str("");
msg << " NINPUT_ITEMS: " << nitems;
@@ -161,11 +169,15 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
}
downstream_max_nitems[i] = nitems;
downstream_lcm_nitems[i] = lcm_nitems;
+ downstream_max_out_mult[i] = max_out_multiple;
}
// Allocate the block detail and necessary buffers
- grblock->allocate_detail(
- ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems);
+ grblock->allocate_detail(ninputs,
+ noutputs,
+ downstream_max_nitems,
+ downstream_lcm_nitems,
+ downstream_max_out_mult);
}
void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
@@ -189,26 +201,46 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
if (!src_grblock)
throw std::runtime_error("connect_block_inputs found non-gr::block");
+ // In order to determine the buffer context, we need to examine both
+ // the upstream and the downstream buffer_types
+ buffer_type src_buf_type =
+ src_grblock->output_signature()->stream_buffer_type(src_port);
+ buffer_type dest_buf_type =
+ grblock->input_signature()->stream_buffer_type(dst_port);
+
+ buffer_context context;
+ if (src_buf_type == buffer_double_mapped::type &&
+ dest_buf_type == buffer_double_mapped::type) {
+ context = buffer_context::HOST_TO_HOST;
+ } else if (src_buf_type != buffer_double_mapped::type &&
+ dest_buf_type == buffer_double_mapped::type) {
+ context = buffer_context::DEVICE_TO_HOST;
+ } else if (src_buf_type == buffer_double_mapped::type &&
+ dest_buf_type != buffer_double_mapped::type) {
+ context = buffer_context::HOST_TO_DEVICE;
+ } else if (src_buf_type != buffer_double_mapped::type &&
+ dest_buf_type != buffer_double_mapped::type) {
+ context = buffer_context::DEVICE_TO_DEVICE;
+ }
+
buffer_sptr src_buffer;
- buffer_type_t src_buf_type = src_grblock->get_buffer_type();
- buffer_type_t dest_buf_type = grblock->get_buffer_type();
- if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() ||
+ if (dest_buf_type == buffer_double_mapped::type ||
dest_buf_type == src_buf_type) {
// The block is not using a custom buffer OR the block and the upstream
// block both use the same kind of custom buffer
src_buffer = src_grblock->detail()->output(src_port);
} else {
- if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() &&
- src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) {
+ if (dest_buf_type != buffer_double_mapped::type &&
+ src_buf_type == buffer_double_mapped::type) {
// The block uses a custom buffer but the upstream block does not
// therefore the upstream block's buffer can be replaced with the
// type of buffer that the block needs
std::ostringstream msg;
msg << "Block: " << grblock->identifier()
- << "replacing upstream block: " << src_grblock->identifier()
+ << " replacing upstream block: " << src_grblock->identifier()
<< " buffer with a custom buffer";
- GR_LOG_DEBUG(d_debug_logger, msg.str());
- src_buffer = src_grblock->replace_buffer(src_port, grblock);
+ GR_LOG_DEBUG(d_logger, msg.str());
+ src_buffer = src_grblock->replace_buffer(src_port, dst_port, grblock);
} else {
// Both the block and upstream block use incompatible buffer types
// which is not currently allowed
@@ -223,9 +255,13 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
}
}
- GR_LOG_DEBUG(d_debug_logger,
- "Setting input " + std::to_string(dst_port) + " from edge " +
- (*e).identifier());
+ // Set buffer's context
+ src_buffer->set_context(context);
+
+ std::ostringstream msg;
+ msg << "Setting input " << dst_port << " from edge " << (*e).identifier()
+ << " context: " << context;
+ GR_LOG_DEBUG(d_debug_logger, msg.str());
detail->set_input(dst_port,
buffer_add_reader(src_buffer,
@@ -341,7 +377,6 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg)
// Now deal with the fact that the block details might have
// changed numbers of inputs and outputs vs. in the old
// flowgraph.
-
block->detail()->reset_nitem_counters();
block->detail()->clear_tags();
}
diff --git a/gnuradio-runtime/lib/host_buffer.cc b/gnuradio-runtime/lib/host_buffer.cc
new file mode 100644
index 0000000000..6e1a1d1003
--- /dev/null
+++ b/gnuradio-runtime/lib/host_buffer.cc
@@ -0,0 +1,255 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx Inc..
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include <cstring>
+#include <sstream>
+#include <stdexcept>
+
+#include <gnuradio/block.h>
+#include <gnuradio/host_buffer.h>
+
+namespace gr {
+
+buffer_type host_buffer::type(buftype_HOST_BUFFER{});
+
+void* host_buffer::device_memcpy(void* dest, const void* src, std::size_t count)
+{
+ // There is no spoon...er... device so fake it out using regular memcpy
+ return std::memcpy(dest, src, count);
+}
+
+void* host_buffer::device_memmove(void* dest, const void* src, std::size_t count)
+{
+ // There is no spoon...er... device so fake it out using regular memmmove
+ return std::memmove(dest, src, count);
+}
+
+
+host_buffer::host_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner)
+ : buffer_single_mapped(nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ link,
+ buf_owner),
+ d_device_base(nullptr)
+{
+ gr::configure_default_loggers(d_logger, d_debug_logger, "host_buffer");
+ if (!allocate_buffer(nitems))
+ throw std::bad_alloc();
+}
+
+host_buffer::~host_buffer() {}
+
+void host_buffer::post_work(int nitems)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "host_buffer [" << d_context << "] -- post_work: " << nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ if (nitems <= 0) {
+ return;
+ }
+
+ // NOTE: when this function is called the write pointer has not yet been
+ // advanced so it can be used directly as the source ptr
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE: {
+ // Copy data from host buffer to device buffer
+ void* dest_ptr = &d_device_base[d_write_index * d_sizeof_item];
+ device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item);
+ } break;
+
+ case buffer_context::DEVICE_TO_HOST: {
+ // Copy data from device buffer to host buffer
+ void* dest_ptr = &d_base[d_write_index * d_sizeof_item];
+ device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item);
+ } break;
+
+ case buffer_context::DEVICE_TO_DEVICE:
+ // No op
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+}
+
+bool host_buffer::do_allocate_buffer(size_t final_nitems, size_t sizeof_item)
+{
+#ifdef BUFFER_DEBUG
+ {
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "host_buffer constructor -- nitems: " << final_nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
+ }
+#endif
+
+ // This is the host buffer
+ d_buffer.reset(new char[final_nitems * sizeof_item]());
+ d_base = d_buffer.get();
+
+ // This is the simulated device buffer
+ d_device_buf.reset(new char[final_nitems * sizeof_item]());
+ d_device_base = d_device_buf.get();
+
+ return true;
+}
+
+void* host_buffer::write_pointer()
+{
+ void* ptr = nullptr;
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE:
+ // Write into host buffer
+ ptr = &d_base[d_write_index * d_sizeof_item];
+ break;
+
+ case buffer_context::DEVICE_TO_HOST:
+ case buffer_context::DEVICE_TO_DEVICE:
+ // Write into "device" buffer
+ ptr = &d_device_base[d_write_index * d_sizeof_item];
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+
+ return ptr;
+}
+
+const void* host_buffer::_read_pointer(unsigned int read_index)
+{
+ void* ptr = nullptr;
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE:
+ case buffer_context::DEVICE_TO_DEVICE:
+ // Read from "device" buffer
+ ptr = &d_device_base[read_index * d_sizeof_item];
+ break;
+
+ case buffer_context::DEVICE_TO_HOST:
+ // Read from host buffer
+ ptr = &d_base[read_index * d_sizeof_item];
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+
+ return ptr;
+}
+
+bool host_buffer::input_blocked_callback(int items_required,
+ int items_avail,
+ unsigned read_index)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "host_buffer [" << d_context << "] -- input_blocked_callback";
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ bool rc = false;
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE:
+ case buffer_context::DEVICE_TO_DEVICE:
+ // Adjust "device" buffer
+ rc = input_blocked_callback_logic(items_required,
+ items_avail,
+ read_index,
+ d_device_base,
+ host_buffer::device_memcpy,
+ host_buffer::device_memmove);
+ break;
+
+ case buffer_context::DEVICE_TO_HOST:
+ case buffer_context::HOST_TO_HOST:
+ // Adjust host buffer
+ rc = input_blocked_callback_logic(
+ items_required, items_avail, read_index, d_base, std::memcpy, std::memmove);
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+
+ return rc;
+}
+
+bool host_buffer::output_blocked_callback(int output_multiple, bool force)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "host_buffer [" << d_context << "] -- output_blocked_callback";
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ bool rc = false;
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE:
+ case buffer_context::HOST_TO_HOST:
+ // Adjust host buffer
+ rc = output_blocked_callback_logic(output_multiple, force, d_base, std::memmove);
+ break;
+
+ case buffer_context::DEVICE_TO_HOST:
+ case buffer_context::DEVICE_TO_DEVICE:
+ // Adjust "device" buffer
+ rc = output_blocked_callback_logic(
+ output_multiple, force, d_device_base, host_buffer::device_memmove);
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+
+ return rc;
+}
+
+buffer_sptr host_buffer::make_host_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner)
+{
+ return buffer_sptr(new host_buffer(nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ link,
+ buf_owner));
+}
+
+} /* namespace gr */
diff --git a/gnuradio-runtime/lib/io_signature.cc b/gnuradio-runtime/lib/io_signature.cc
index 507268d5ce..e953c7eb5a 100644
--- a/gnuradio-runtime/lib/io_signature.cc
+++ b/gnuradio-runtime/lib/io_signature.cc
@@ -22,47 +22,69 @@ gr::io_signature::sptr io_signature::makev(int min_streams,
int max_streams,
const std::vector<int>& sizeof_stream_items)
{
+ gr_vector_buffer_type buftypes(sizeof_stream_items.size(),
+ buffer_double_mapped::type);
return gr::io_signature::sptr(
- new io_signature(min_streams, max_streams, sizeof_stream_items));
+ new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes));
}
-gr::io_signature::sptr
-io_signature::make(int min_streams, int max_streams, int sizeof_stream_item)
+gr::io_signature::sptr io_signature::makev(int min_streams,
+ int max_streams,
+ const std::vector<int>& sizeof_stream_items,
+ gr_vector_buffer_type buftypes)
{
- std::vector<int> sizeof_items(1);
- sizeof_items[0] = sizeof_stream_item;
- return io_signature::makev(min_streams, max_streams, sizeof_items);
+ return gr::io_signature::sptr(
+ new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes));
+}
+
+gr::io_signature::sptr io_signature::make(int min_streams,
+ int max_streams,
+ int sizeof_stream_item,
+ buffer_type buftype)
+{
+ std::vector<int> sizeof_items{ sizeof_stream_item };
+ gr_vector_buffer_type buftypes{ buftype };
+ return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
}
gr::io_signature::sptr io_signature::make2(int min_streams,
int max_streams,
int sizeof_stream_item1,
- int sizeof_stream_item2)
+ int sizeof_stream_item2,
+ buffer_type buftype1,
+ buffer_type buftype2)
{
- std::vector<int> sizeof_items(2);
- sizeof_items[0] = sizeof_stream_item1;
- sizeof_items[1] = sizeof_stream_item2;
- return io_signature::makev(min_streams, max_streams, sizeof_items);
+ std::vector<int> sizeof_items{ sizeof_stream_item1, sizeof_stream_item2 };
+ gr_vector_buffer_type buftypes{ buftype1, buftype2 };
+ return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
}
gr::io_signature::sptr io_signature::make3(int min_streams,
int max_streams,
int sizeof_stream_item1,
int sizeof_stream_item2,
- int sizeof_stream_item3)
+ int sizeof_stream_item3,
+ buffer_type buftype1,
+ buffer_type buftype2,
+ buffer_type buftype3)
{
- std::vector<int> sizeof_items(3);
- sizeof_items[0] = sizeof_stream_item1;
- sizeof_items[1] = sizeof_stream_item2;
- sizeof_items[2] = sizeof_stream_item3;
- return io_signature::makev(min_streams, max_streams, sizeof_items);
+ std::vector<int> sizeof_items{ sizeof_stream_item1,
+ sizeof_stream_item2,
+ sizeof_stream_item3 };
+ gr_vector_buffer_type buftypes{ buftype1, buftype2, buftype3 };
+ return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
}
// ------------------------------------------------------------------------
io_signature::io_signature(int min_streams,
int max_streams,
- const std::vector<int>& sizeof_stream_items)
+ const std::vector<int>& sizeof_stream_items,
+ gr_vector_buffer_type buftypes)
+ : d_min_streams(min_streams),
+ d_max_streams(max_streams),
+ d_sizeof_stream_item(sizeof_stream_items),
+ d_stream_buffer_type(buftypes)
{
if (min_streams < 0 || (max_streams != IO_INFINITE && max_streams < min_streams))
throw std::invalid_argument("gr::io_signature(1)");
@@ -75,10 +97,6 @@ io_signature::io_signature(int min_streams,
if (max_streams != 0 && sizeof_stream_items[i] < 1)
throw std::invalid_argument("gr::io_signature(3)");
}
-
- d_min_streams = min_streams;
- d_max_streams = max_streams;
- d_sizeof_stream_item = sizeof_stream_items;
}
io_signature::~io_signature() {}
@@ -97,4 +115,14 @@ std::vector<int> io_signature::sizeof_stream_items() const
return d_sizeof_stream_item;
}
+buffer_type io_signature::stream_buffer_type(size_t index) const
+{
+ return d_stream_buffer_type[std::min(index, d_stream_buffer_type.size() - 1)];
+}
+
+gr_vector_buffer_type io_signature::stream_buffer_types() const
+{
+ return d_stream_buffer_type;
+}
+
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/qa_buffer.cc b/gnuradio-runtime/lib/qa_buffer.cc
index cefe548338..1c97fd070d 100644
--- a/gnuradio-runtime/lib/qa_buffer.cc
+++ b/gnuradio-runtime/lib/qa_buffer.cc
@@ -43,7 +43,7 @@ static void t0_body()
int counter = 0;
gr::buffer_sptr buf(
- gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
int last_sa;
int sa;
@@ -78,7 +78,7 @@ static void t1_body()
int read_counter = 0;
gr::buffer_sptr buf(
- gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr()));
int sa;
@@ -150,7 +150,7 @@ static void t2_body()
int nitems = (64 * (1L << 10)) / sizeof(int); // 64K worth of ints
gr::buffer_sptr buf(
- gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr()));
int read_counter = 0;
@@ -216,7 +216,7 @@ static void t3_body()
static const int N = 5;
gr::buffer_sptr buf(
- gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
gr::buffer_reader_sptr reader[N];
int read_counter[N];
int write_counter = 0;
diff --git a/gnuradio-runtime/lib/qa_host_buffer.cc b/gnuradio-runtime/lib/qa_host_buffer.cc
new file mode 100644
index 0000000000..e8f29afe8d
--- /dev/null
+++ b/gnuradio-runtime/lib/qa_host_buffer.cc
@@ -0,0 +1,334 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gnuradio/block.h>
+#include <gnuradio/buffer_context.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/host_buffer.h>
+#include <gnuradio/random.h>
+#include <boost/test/unit_test.hpp>
+#include <cstdlib>
+#include <iostream>
+
+// This is a trivial mocked up block inspired by gr::blocks::nop that is used
+// only as a placeholder for testing host_buffer below.
+class nop : public gr::block
+{
+public:
+ typedef std::shared_ptr<nop> sptr;
+ static sptr make(size_t sizeof_stream_item)
+ {
+ return gnuradio::make_block_sptr<nop>(sizeof_stream_item);
+ }
+
+ nop(size_t sizeof_stream_item)
+ : block("nop",
+ gr::io_signature::make(0, -1, sizeof_stream_item),
+ gr::io_signature::make(0, -1, sizeof_stream_item))
+ {
+ }
+
+ ~nop() override {}
+
+ int general_work(int noutput_items,
+ gr_vector_int& ninput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items) override
+ {
+ // eat any input that's available
+ for (unsigned i = 0; i < ninput_items.size(); i++)
+ consume(i, ninput_items[i]);
+
+ return noutput_items;
+ }
+};
+
+
+// ----------------------------------------------------------------------------
+// Basic checks for buffer_single_mapped using the host_buffer implementation
+// of the interface for testing.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t0)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ BOOST_CHECK(buf->space_available() == nitems);
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ buf->update_write_pointer(1000);
+ BOOST_CHECK(buf->space_available() == (nitems - (idx * 1000)));
+
+ BOOST_CHECK(rdr1->items_available() == (idx * 1000));
+ }
+
+ BOOST_CHECK(buf->space_available() == 384);
+
+ buf->update_write_pointer(buf->space_available());
+ BOOST_CHECK(buf->space_available() == 0);
+ BOOST_CHECK(rdr1->items_available() == nitems);
+ BOOST_CHECK(buf->space_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic checks for buffer_single_mapped using the host_buffer implementation
+// of the interface for testing.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t1)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ int space = buf->space_available();
+ BOOST_CHECK(nitems == space);
+
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(nitems);
+ BOOST_CHECK(buf->space_available() == 0);
+ BOOST_CHECK(rdr1->items_available() == nitems);
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ rdr1->update_read_pointer(1000);
+ BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+ space = buf->space_available();
+ BOOST_CHECK(space == (idx * 1000));
+ }
+
+ BOOST_CHECK(rdr1->items_available() == 384);
+ rdr1->update_read_pointer(384);
+ BOOST_CHECK(rdr1->items_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic check reader/write wrapping of buffer_single_mapped with 1 reader.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t2)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ int space = buf->space_available();
+ BOOST_CHECK(nitems == space);
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(nitems);
+ BOOST_CHECK(buf->space_available() == 0);
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ rdr1->update_read_pointer(1000);
+ BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+ space = buf->space_available();
+
+ if (idx <= 9)
+ BOOST_CHECK(space == (idx * 1000));
+ else
+ BOOST_CHECK(space == ((idx * 1000) - (nitems / 2)));
+
+ if (idx == 9) {
+ buf->update_write_pointer(nitems / 2);
+ }
+ }
+
+ // At this point we can only read up until the end of the buffer even though
+ // additional data is available at the beginning of the buffer
+ BOOST_CHECK(rdr1->items_available() == 384);
+ rdr1->update_read_pointer(384);
+
+ // Now the (nitems / 2) at the beginning of the buffer should be available
+ BOOST_CHECK(rdr1->items_available() == (nitems / 2));
+
+ for (int idx = 0; idx < 4; ++idx)
+ rdr1->update_read_pointer(1024);
+
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+ BOOST_CHECK(rdr1->items_available() == (nitems / 4));
+
+ for (int idx = 0; idx < 4; ++idx)
+ rdr1->update_read_pointer(1000);
+
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+ BOOST_CHECK(rdr1->items_available() == 96);
+
+ rdr1->update_read_pointer(96);
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+}
+
+// ----------------------------------------------------------------------------
+// Basic check reader/write wrapping of buffer_single_mapped with 2 readers.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t3)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+ gr::buffer_reader_sptr rdr2(gr::buffer_add_reader(buf, 0, nop));
+
+ int space = buf->space_available();
+ BOOST_CHECK(nitems == space);
+ BOOST_CHECK(rdr1->items_available() == 0);
+ BOOST_CHECK(rdr2->items_available() == 0);
+
+ buf->update_write_pointer(nitems);
+ BOOST_CHECK(buf->space_available() == 0);
+ BOOST_CHECK(rdr1->items_available() == nitems);
+ BOOST_CHECK(rdr2->items_available() == nitems);
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ rdr1->update_read_pointer(1000);
+ BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+ // Reader 2 hasn't read anything so space available should remain 0
+ BOOST_CHECK(buf->space_available() == 0);
+ }
+
+ int last_rdr1_available = rdr1->items_available();
+ int increment = last_rdr1_available / 4;
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ rdr2->update_read_pointer(1000);
+ BOOST_CHECK(rdr2->items_available() == (nitems - (idx * 1000)));
+
+ BOOST_CHECK(rdr1->items_available() == last_rdr1_available);
+ if (idx % 4 == 0) {
+ rdr1->update_read_pointer(increment);
+ BOOST_CHECK(rdr1->items_available() == (last_rdr1_available - increment));
+ last_rdr1_available = rdr1->items_available();
+ }
+
+ BOOST_CHECK(buf->space_available() == (idx * 1000));
+ }
+}
+
+// ----------------------------------------------------------------------------
+// Basic check of output blocked callback
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t4)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ BOOST_CHECK(nitems == buf->space_available());
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(nitems / 2);
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+ BOOST_CHECK(rdr1->items_available() == (nitems / 2));
+
+ rdr1->update_read_pointer(nitems / 2);
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(8000);
+ BOOST_CHECK(buf->space_available() == 192);
+
+ bool ready = buf->output_blkd_cb_ready(200);
+ BOOST_CHECK(ready == true);
+
+ bool success = buf->output_blocked_callback(200);
+ BOOST_CHECK(success == true);
+ BOOST_CHECK(buf->space_available() == 8384);
+ BOOST_CHECK(rdr1->items_available() == 8000);
+
+ rdr1->update_read_pointer(4000);
+ BOOST_CHECK(buf->space_available() == 8384);
+ BOOST_CHECK(rdr1->items_available() == 4000);
+
+ buf->update_write_pointer(4000);
+ BOOST_CHECK(buf->space_available() == 4384);
+ BOOST_CHECK(rdr1->items_available() == 8000);
+
+ rdr1->update_read_pointer(8000);
+ BOOST_CHECK(buf->space_available() == 4384);
+ BOOST_CHECK(rdr1->items_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic check of input blocked callback
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t5)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ BOOST_CHECK(nitems == buf->space_available());
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(16000);
+ BOOST_CHECK(buf->space_available() == 384);
+ BOOST_CHECK(rdr1->items_available() == 16000);
+
+ rdr1->update_read_pointer(16000);
+ BOOST_CHECK(buf->space_available() == 384);
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(384);
+ BOOST_CHECK(buf->space_available() == 16000);
+ BOOST_CHECK(rdr1->items_available() == 384);
+
+ buf->update_write_pointer(116);
+ BOOST_CHECK(buf->space_available() == 15884);
+ BOOST_CHECK(rdr1->items_available() == 384);
+
+ bool ready = rdr1->input_blkd_cb_ready(400);
+ BOOST_CHECK(ready == true);
+
+ bool success = rdr1->input_blocked_callback(400, rdr1->items_available());
+ BOOST_CHECK(success == true);
+ BOOST_CHECK(rdr1->items_available() == 500);
+
+ rdr1->update_read_pointer(500);
+ BOOST_CHECK(buf->space_available() == 15884);
+ BOOST_CHECK(rdr1->items_available() == 0);
+}