summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/buffer.cc
diff options
context:
space:
mode:
authorDavid Sorber <david.sorber@blacklynx.tech>2021-05-12 08:59:21 -0400
committermormj <34754695+mormj@users.noreply.github.com>2021-10-25 11:27:01 -0400
commit788827ae116bef871e144abd39b1e4482208eabe (patch)
treedcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/lib/buffer.cc
parentb8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff)
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes: * Refactored existing single mapped buffer code and created single mapped buffer abstraction; wrapping within single mapped buffers is handled explicitly by input blocked and output blocked callbacks that are called from block_executor * Added simple custom buffer allocation interface (NOTE: this interface will change for milestone 2) * Accelerated blocks are still responsible for data transfer but the custom buffer interface eliminates the double copy problem Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib/buffer.cc')
-rw-r--r--gnuradio-runtime/lib/buffer.cc273
1 files changed, 94 insertions, 179 deletions
diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc
index cb6b949932..7fd0a39579 100644
--- a/gnuradio-runtime/lib/buffer.cc
+++ b/gnuradio-runtime/lib/buffer.cc
@@ -13,6 +13,10 @@
#endif
#include "vmcircbuf.h"
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_double_mapped.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/buffer_single_mapped.h>
+#include <gnuradio/buffer_type.h>
#include <gnuradio/integer_math.h>
#include <gnuradio/math.h>
#include <boost/format.hpp>
@@ -23,7 +27,6 @@
namespace gr {
static long s_buffer_count = 0; // counts for debugging storage mgmt
-static long s_buffer_reader_count = 0;
/* ----------------------------------------------------------------------------
Notes on storage management
@@ -52,117 +55,81 @@ static long s_buffer_reader_count = 0;
gr::buffer_reader goes to zero, we can successfully reclaim it.
---------------------------------------------------------------------------- */
-/*
- * Compute the minimum number of buffer items that work (i.e.,
- * address space wrap-around works). To work is to satisfy this
- * constraint for integer buffer_size and k:
- *
- * type_size * nitems == k * page_size
- */
-static inline long minimum_buffer_items(long type_size, long page_size)
-{
- return page_size / GR_GCD(type_size, page_size);
-}
-
-buffer::buffer(int nitems, size_t sizeof_item, block_sptr link)
+buffer::buffer(BufferMappingType buf_type,
+ int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link)
: d_base(0),
d_bufsize(0),
+ d_buf_map_type(buf_type),
d_max_reader_delay(0),
+ d_max_reader_history(1),
+ d_has_history(false),
d_sizeof_item(sizeof_item),
d_link(link),
d_write_index(0),
d_abs_write_offset(0),
d_done(false),
- d_last_min_items_read(0)
+ d_last_min_items_read(0),
+ d_callback_flag(false),
+ d_active_pointer_counter(0),
+ d_downstream_lcm_nitems(downstream_lcm_nitems),
+ d_write_multiple(0)
{
gr::configure_default_loggers(d_logger, d_debug_logger, "buffer");
- if (!allocate_buffer(nitems, sizeof_item))
- throw std::bad_alloc();
s_buffer_count++;
}
-buffer_sptr make_buffer(int nitems, size_t sizeof_item, block_sptr link)
+buffer_sptr make_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link,
+ block_sptr buf_owner)
{
- return buffer_sptr(new buffer(nitems, sizeof_item, link));
-}
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ gr::logger_ptr logger;
+ gr::logger_ptr debug_logger;
+ gr::configure_default_loggers(logger, debug_logger, "make_buffer");
+ std::ostringstream msg;
+#endif
-buffer::~buffer()
-{
- assert(d_readers.size() == 0);
- s_buffer_count--;
-}
+#if DEBUG_SINGLE_MAPPED
+ if (1) {
+#else
+ if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) {
+#endif
+ // Buffer type is NOT the default non custom variety so allocate a
+ // buffer_single_mapped instance
+#ifdef BUFFER_DEBUG
+ msg << "buffer_single_mapped nitems: " << nitems
+ << " -- sizeof_item: " << sizeof_item;
+ GR_LOG_DEBUG(logger, msg.str());
+#endif
-/*!
- * sets d_vmcircbuf, d_base, d_bufsize.
- * returns true iff successful.
- */
-bool buffer::allocate_buffer(int nitems, size_t sizeof_item)
-{
- int orig_nitems = nitems;
-
- // Any buffersize we come up with must be a multiple of min_nitems.
- int granularity = gr::vmcircbuf_sysconfig::granularity();
- int min_nitems = minimum_buffer_items(sizeof_item, granularity);
-
- // Round-up nitems to a multiple of min_nitems.
- if (nitems % min_nitems != 0)
- nitems = ((nitems / min_nitems) + 1) * min_nitems;
-
- // If we rounded-up a whole bunch, give the user a heads up.
- // This only happens if sizeof_item is not a power of two.
-
- if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) {
- auto msg =
- str(boost::format(
- "allocate_buffer: tried to allocate"
- " %d items of size %d. Due to alignment requirements"
- " %d were allocated. If this isn't OK, consider padding"
- " your structure to a power-of-two bytes."
- " On this platform, our allocation granularity is %d bytes.") %
- orig_nitems % sizeof_item % nitems % granularity);
- GR_LOG_WARN(d_logger, msg.c_str());
- }
+ return buffer_sptr(new buffer_single_mapped(
+ nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner));
- d_bufsize = nitems;
- d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item));
- if (d_vmcircbuf == 0) {
- std::ostringstream msg;
- msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size "
- << d_bufsize * d_sizeof_item / 1024 << " KB";
- GR_LOG_ERROR(d_logger, msg.str());
- return false;
- }
+ } else {
+ // Default to allocating a buffer_double_mapped instance
+#ifdef BUFFER_DEBUG
+ msg << "buffer_double_mapped nitems: " << nitems
+ << " -- sizeof_item: " << sizeof_item;
+ GR_LOG_DEBUG(logger, msg.str());
+#endif
- d_base = (char*)d_vmcircbuf->pointer_to_first_copy();
- return true;
+ return buffer_sptr(
+ new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
+ }
}
-int buffer::space_available()
+buffer::~buffer()
{
- if (d_readers.empty())
- return d_bufsize - 1; // See comment below
-
- else {
- // Find out the maximum amount of data available to our readers
-
- int most_data = d_readers[0]->items_available();
- uint64_t min_items_read = d_readers[0]->nitems_read();
- for (size_t i = 1; i < d_readers.size(); i++) {
- most_data = std::max(most_data, d_readers[i]->items_available());
- min_items_read = std::min(min_items_read, d_readers[i]->nitems_read());
- }
-
- if (min_items_read != d_last_min_items_read) {
- prune_tags(d_last_min_items_read);
- d_last_min_items_read = min_items_read;
- }
-
- // The -1 ensures that the case d_write_index == d_read_index is
- // unambiguous. It indicates that there is no data for the reader
- return d_bufsize - most_data - 1;
- }
+ assert(d_readers.size() == 0);
+ s_buffer_count--;
}
void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
@@ -170,8 +137,22 @@ void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
void buffer::update_write_pointer(int nitems)
{
gr::thread::scoped_lock guard(*mutex());
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ unsigned orig_wr_idx = d_write_index;
+#endif
+
d_write_index = index_add(d_write_index, nitems);
d_abs_write_offset += nitems;
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx
+ << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
}
void buffer::set_done(bool done)
@@ -180,20 +161,6 @@ void buffer::set_done(bool done)
d_done = done;
}
-buffer_reader_sptr
-buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay)
-{
- if (nzero_preload < 0)
- throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0");
-
- buffer_reader_sptr r(
- new buffer_reader(buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
- r->declare_sample_delay(delay);
- buf->d_readers.push_back(r.get());
-
- return r;
-}
-
void buffer::drop_reader(buffer_reader* reader)
{
std::vector<buffer_reader*>::iterator result =
@@ -263,92 +230,40 @@ void buffer::prune_tags(uint64_t max_time)
}
}
-long buffer_ncurrently_allocated() { return s_buffer_count; }
-
-// ----------------------------------------------------------------------------
-
-buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link)
- : d_buffer(buffer),
- d_read_index(read_index),
- d_abs_read_offset(0),
- d_link(link),
- d_attr_delay(0)
-{
- s_buffer_reader_count++;
-}
-
-buffer_reader::~buffer_reader()
+void buffer::on_lock(gr::thread::scoped_lock& lock)
{
- d_buffer->drop_reader(this);
- s_buffer_reader_count--;
+ // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object
+
+ // Wait until no other callback is active and no pointers are active for
+ // the buffer, then mark the callback flag active.
+ d_cv.wait(lock, [this]() {
+ return (d_callback_flag == false && d_active_pointer_counter == 0);
+ });
+ d_callback_flag = true;
}
-void buffer_reader::declare_sample_delay(unsigned delay)
+void buffer::on_unlock()
{
- d_attr_delay = delay;
- d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay);
-}
-
-unsigned buffer_reader::sample_delay() const { return d_attr_delay; }
-
-int buffer_reader::items_available() const
-{
- return d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
-}
+ // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object
-const void* buffer_reader::read_pointer()
-{
- return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item];
+ // Mark the callback flag inactive and notify anyone waiting
+ d_callback_flag = false;
+ d_cv.notify_all();
}
-void buffer_reader::update_read_pointer(int nitems)
-{
- gr::thread::scoped_lock guard(*mutex());
- d_read_index = d_buffer->index_add(d_read_index, nitems);
- d_abs_read_offset += nitems;
-}
+long buffer_ncurrently_allocated() { return s_buffer_count; }
-void buffer_reader::get_tags_in_range(std::vector<tag_t>& v,
- uint64_t abs_start,
- uint64_t abs_end,
- long id)
+std::ostream& operator<<(std::ostream& os, const buffer& buf)
{
- gr::thread::scoped_lock guard(*mutex());
-
- uint64_t lower_bound = abs_start - d_attr_delay;
- // check for underflow and if so saturate at 0
- if (lower_bound > abs_start)
- lower_bound = 0;
- uint64_t upper_bound = abs_end - d_attr_delay;
- // check for underflow and if so saturate at 0
- if (upper_bound > abs_end)
- upper_bound = 0;
-
- v.clear();
- std::multimap<uint64_t, tag_t>::iterator itr =
- d_buffer->get_tags_lower_bound(lower_bound);
- std::multimap<uint64_t, tag_t>::iterator itr_end =
- d_buffer->get_tags_upper_bound(upper_bound);
-
- uint64_t item_time;
- while (itr != itr_end) {
- item_time = (*itr).second.offset + d_attr_delay;
- if ((item_time >= abs_start) && (item_time < abs_end)) {
- std::vector<long>::iterator id_itr;
- id_itr = std::find(
- itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id);
- // If id is not in the vector of marked blocks
- if (id_itr == itr->second.marked_deleted.end()) {
- tag_t t = (*itr).second;
- t.offset += d_attr_delay;
- v.push_back(t);
- v.back().marked_deleted.clear();
- }
- }
- itr++;
+ os << std::endl
+ << " sz: " << buf.d_bufsize << std::endl
+ << " nrdrs: " << buf.d_readers.size() << std::endl;
+ for (auto& rdr : buf.d_readers) {
+ os << " rd_idx: " << rdr->get_read_index() << std::endl
+ << " abs_rd_offset: " << rdr->get_abs_read_offset() << std::endl
+ << std::endl;
}
+ return os;
}
-long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; }
-
} /* namespace gr */