diff options
author | David Sorber <david.sorber@blacklynx.tech> | 2021-05-12 08:59:21 -0400 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-10-25 11:27:01 -0400 |
commit | 788827ae116bef871e144abd39b1e4482208eabe (patch) | |
tree | dcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/lib/buffer.cc | |
parent | b8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff) |
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes:
* Refactored existing single mapped buffer code and created single
mapped buffer abstraction; wrapping within single mapped buffers
is handled explicitly by input blocked and output blocked
callbacks that are called from block_executor
* Added simple custom buffer allocation interface (NOTE: this
interface will change for milestone 2)
* Accelerated blocks are still responsible for data transfer but the
custom buffer interface eliminates the double copy problem
Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib/buffer.cc')
-rw-r--r-- | gnuradio-runtime/lib/buffer.cc | 273 |
1 files changed, 94 insertions, 179 deletions
diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc index cb6b949932..7fd0a39579 100644 --- a/gnuradio-runtime/lib/buffer.cc +++ b/gnuradio-runtime/lib/buffer.cc @@ -13,6 +13,10 @@ #endif #include "vmcircbuf.h" #include <gnuradio/buffer.h> +#include <gnuradio/buffer_double_mapped.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_single_mapped.h> +#include <gnuradio/buffer_type.h> #include <gnuradio/integer_math.h> #include <gnuradio/math.h> #include <boost/format.hpp> @@ -23,7 +27,6 @@ namespace gr { static long s_buffer_count = 0; // counts for debugging storage mgmt -static long s_buffer_reader_count = 0; /* ---------------------------------------------------------------------------- Notes on storage management @@ -52,117 +55,81 @@ static long s_buffer_reader_count = 0; gr::buffer_reader goes to zero, we can successfully reclaim it. ---------------------------------------------------------------------------- */ -/* - * Compute the minimum number of buffer items that work (i.e., - * address space wrap-around works). To work is to satisfy this - * constraint for integer buffer_size and k: - * - * type_size * nitems == k * page_size - */ -static inline long minimum_buffer_items(long type_size, long page_size) -{ - return page_size / GR_GCD(type_size, page_size); -} - -buffer::buffer(int nitems, size_t sizeof_item, block_sptr link) +buffer::buffer(BufferMappingType buf_type, + int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link) : d_base(0), d_bufsize(0), + d_buf_map_type(buf_type), d_max_reader_delay(0), + d_max_reader_history(1), + d_has_history(false), d_sizeof_item(sizeof_item), d_link(link), d_write_index(0), d_abs_write_offset(0), d_done(false), - d_last_min_items_read(0) + d_last_min_items_read(0), + d_callback_flag(false), + d_active_pointer_counter(0), + d_downstream_lcm_nitems(downstream_lcm_nitems), + d_write_multiple(0) { gr::configure_default_loggers(d_logger, d_debug_logger, "buffer"); - if (!allocate_buffer(nitems, sizeof_item)) - throw std::bad_alloc(); s_buffer_count++; } -buffer_sptr make_buffer(int nitems, size_t sizeof_item, block_sptr link) +buffer_sptr make_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link, + block_sptr buf_owner) { - return buffer_sptr(new buffer(nitems, sizeof_item, link)); -} +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + gr::logger_ptr logger; + gr::logger_ptr debug_logger; + gr::configure_default_loggers(logger, debug_logger, "make_buffer"); + std::ostringstream msg; +#endif -buffer::~buffer() -{ - assert(d_readers.size() == 0); - s_buffer_count--; -} +#if DEBUG_SINGLE_MAPPED + if (1) { +#else + if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) { +#endif + // Buffer type is NOT the default non custom variety so allocate a + // buffer_single_mapped instance +#ifdef BUFFER_DEBUG + msg << "buffer_single_mapped nitems: " << nitems + << " -- sizeof_item: " << sizeof_item; + GR_LOG_DEBUG(logger, msg.str()); +#endif -/*! - * sets d_vmcircbuf, d_base, d_bufsize. - * returns true iff successful. - */ -bool buffer::allocate_buffer(int nitems, size_t sizeof_item) -{ - int orig_nitems = nitems; - - // Any buffersize we come up with must be a multiple of min_nitems. - int granularity = gr::vmcircbuf_sysconfig::granularity(); - int min_nitems = minimum_buffer_items(sizeof_item, granularity); - - // Round-up nitems to a multiple of min_nitems. - if (nitems % min_nitems != 0) - nitems = ((nitems / min_nitems) + 1) * min_nitems; - - // If we rounded-up a whole bunch, give the user a heads up. - // This only happens if sizeof_item is not a power of two. - - if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) { - auto msg = - str(boost::format( - "allocate_buffer: tried to allocate" - " %d items of size %d. Due to alignment requirements" - " %d were allocated. If this isn't OK, consider padding" - " your structure to a power-of-two bytes." - " On this platform, our allocation granularity is %d bytes.") % - orig_nitems % sizeof_item % nitems % granularity); - GR_LOG_WARN(d_logger, msg.c_str()); - } + return buffer_sptr(new buffer_single_mapped( + nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner)); - d_bufsize = nitems; - d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item)); - if (d_vmcircbuf == 0) { - std::ostringstream msg; - msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size " - << d_bufsize * d_sizeof_item / 1024 << " KB"; - GR_LOG_ERROR(d_logger, msg.str()); - return false; - } + } else { + // Default to allocating a buffer_double_mapped instance +#ifdef BUFFER_DEBUG + msg << "buffer_double_mapped nitems: " << nitems + << " -- sizeof_item: " << sizeof_item; + GR_LOG_DEBUG(logger, msg.str()); +#endif - d_base = (char*)d_vmcircbuf->pointer_to_first_copy(); - return true; + return buffer_sptr( + new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); + } } -int buffer::space_available() +buffer::~buffer() { - if (d_readers.empty()) - return d_bufsize - 1; // See comment below - - else { - // Find out the maximum amount of data available to our readers - - int most_data = d_readers[0]->items_available(); - uint64_t min_items_read = d_readers[0]->nitems_read(); - for (size_t i = 1; i < d_readers.size(); i++) { - most_data = std::max(most_data, d_readers[i]->items_available()); - min_items_read = std::min(min_items_read, d_readers[i]->nitems_read()); - } - - if (min_items_read != d_last_min_items_read) { - prune_tags(d_last_min_items_read); - d_last_min_items_read = min_items_read; - } - - // The -1 ensures that the case d_write_index == d_read_index is - // unambiguous. It indicates that there is no data for the reader - return d_bufsize - most_data - 1; - } + assert(d_readers.size() == 0); + s_buffer_count--; } void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; } @@ -170,8 +137,22 @@ void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; } void buffer::update_write_pointer(int nitems) { gr::thread::scoped_lock guard(*mutex()); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + unsigned orig_wr_idx = d_write_index; +#endif + d_write_index = index_add(d_write_index, nitems); d_abs_write_offset += nitems; + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx + << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif } void buffer::set_done(bool done) @@ -180,20 +161,6 @@ void buffer::set_done(bool done) d_done = done; } -buffer_reader_sptr -buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay) -{ - if (nzero_preload < 0) - throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0"); - - buffer_reader_sptr r( - new buffer_reader(buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); - r->declare_sample_delay(delay); - buf->d_readers.push_back(r.get()); - - return r; -} - void buffer::drop_reader(buffer_reader* reader) { std::vector<buffer_reader*>::iterator result = @@ -263,92 +230,40 @@ void buffer::prune_tags(uint64_t max_time) } } -long buffer_ncurrently_allocated() { return s_buffer_count; } - -// ---------------------------------------------------------------------------- - -buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link) - : d_buffer(buffer), - d_read_index(read_index), - d_abs_read_offset(0), - d_link(link), - d_attr_delay(0) -{ - s_buffer_reader_count++; -} - -buffer_reader::~buffer_reader() +void buffer::on_lock(gr::thread::scoped_lock& lock) { - d_buffer->drop_reader(this); - s_buffer_reader_count--; + // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object + + // Wait until no other callback is active and no pointers are active for + // the buffer, then mark the callback flag active. + d_cv.wait(lock, [this]() { + return (d_callback_flag == false && d_active_pointer_counter == 0); + }); + d_callback_flag = true; } -void buffer_reader::declare_sample_delay(unsigned delay) +void buffer::on_unlock() { - d_attr_delay = delay; - d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay); -} - -unsigned buffer_reader::sample_delay() const { return d_attr_delay; } - -int buffer_reader::items_available() const -{ - return d_buffer->index_sub(d_buffer->d_write_index, d_read_index); -} + // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object -const void* buffer_reader::read_pointer() -{ - return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item]; + // Mark the callback flag inactive and notify anyone waiting + d_callback_flag = false; + d_cv.notify_all(); } -void buffer_reader::update_read_pointer(int nitems) -{ - gr::thread::scoped_lock guard(*mutex()); - d_read_index = d_buffer->index_add(d_read_index, nitems); - d_abs_read_offset += nitems; -} +long buffer_ncurrently_allocated() { return s_buffer_count; } -void buffer_reader::get_tags_in_range(std::vector<tag_t>& v, - uint64_t abs_start, - uint64_t abs_end, - long id) +std::ostream& operator<<(std::ostream& os, const buffer& buf) { - gr::thread::scoped_lock guard(*mutex()); - - uint64_t lower_bound = abs_start - d_attr_delay; - // check for underflow and if so saturate at 0 - if (lower_bound > abs_start) - lower_bound = 0; - uint64_t upper_bound = abs_end - d_attr_delay; - // check for underflow and if so saturate at 0 - if (upper_bound > abs_end) - upper_bound = 0; - - v.clear(); - std::multimap<uint64_t, tag_t>::iterator itr = - d_buffer->get_tags_lower_bound(lower_bound); - std::multimap<uint64_t, tag_t>::iterator itr_end = - d_buffer->get_tags_upper_bound(upper_bound); - - uint64_t item_time; - while (itr != itr_end) { - item_time = (*itr).second.offset + d_attr_delay; - if ((item_time >= abs_start) && (item_time < abs_end)) { - std::vector<long>::iterator id_itr; - id_itr = std::find( - itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id); - // If id is not in the vector of marked blocks - if (id_itr == itr->second.marked_deleted.end()) { - tag_t t = (*itr).second; - t.offset += d_attr_delay; - v.push_back(t); - v.back().marked_deleted.clear(); - } - } - itr++; + os << std::endl + << " sz: " << buf.d_bufsize << std::endl + << " nrdrs: " << buf.d_readers.size() << std::endl; + for (auto& rdr : buf.d_readers) { + os << " rd_idx: " << rdr->get_read_index() << std::endl + << " abs_rd_offset: " << rdr->get_abs_read_offset() << std::endl + << std::endl; } + return os; } -long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; } - } /* namespace gr */ |