diff options
author | David Sorber <david.sorber@blacklynx.tech> | 2021-07-29 11:34:37 -0400 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-10-25 11:27:01 -0400 |
commit | f3c558d88bc68d865f823c31e7d9aa78b3feab59 (patch) | |
tree | 026d8ff5568bc2c5ab731df29d87901cf1177e00 /gnuradio-runtime/lib | |
parent | 788827ae116bef871e144abd39b1e4482208eabe (diff) |
runtime: Custom Buffer/Accelerator Device Support - Milestone 2
Completion of custom buffer/accelerator device support changes:
* Improved custom buffer interface by removing awkward memory
allocation functions from the block class
* Increased flexibility for creating custom buffers by allowing
creation of buffer_single_mapped subclasses
* Fully incorporated data movement abstraction into the custom
buffer interface and the runtime itself; accelerated blocks are no
longer directly responsible for their own data movement
* Zero copy back-to-back accelerated blocks are now supported (data
no longer needs to be moved back to the host between each block)
Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Signed-off-by: Mike Mason <mike.mason@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib')
-rw-r--r-- | gnuradio-runtime/lib/CMakeLists.txt | 4 | ||||
-rw-r--r-- | gnuradio-runtime/lib/block.cc | 63 | ||||
-rw-r--r-- | gnuradio-runtime/lib/block_detail.cc | 2 | ||||
-rw-r--r-- | gnuradio-runtime/lib/block_executor.cc | 22 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer.cc | 71 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_context.cc | 32 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_double_mapped.cc | 28 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_reader.cc | 26 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_reader_sm.cc | 85 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_single_mapped.cc | 337 | ||||
-rw-r--r-- | gnuradio-runtime/lib/buffer_type.cc | 18 | ||||
-rw-r--r-- | gnuradio-runtime/lib/flat_flowgraph.cc | 67 | ||||
-rw-r--r-- | gnuradio-runtime/lib/host_buffer.cc | 255 | ||||
-rw-r--r-- | gnuradio-runtime/lib/io_signature.cc | 72 | ||||
-rw-r--r-- | gnuradio-runtime/lib/qa_buffer.cc | 8 | ||||
-rw-r--r-- | gnuradio-runtime/lib/qa_host_buffer.cc | 334 |
16 files changed, 1089 insertions, 335 deletions
diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt index ca96549386..6fcd31bac4 100644 --- a/gnuradio-runtime/lib/CMakeLists.txt +++ b/gnuradio-runtime/lib/CMakeLists.txt @@ -55,16 +55,17 @@ add_library(gnuradio-runtime block_gateway_impl.cc block_registry.cc buffer.cc + buffer_context.cc buffer_double_mapped.cc buffer_reader.cc buffer_reader_sm.cc buffer_single_mapped.cc - buffer_type.cc flat_flowgraph.cc flowgraph.cc hier_block2.cc hier_block2_detail.cc high_res_timer.cc + host_buffer.cc io_signature.cc local_sighandler.cc logger.cc @@ -335,6 +336,7 @@ if(ENABLE_TESTING) qa_buffer.cc qa_io_signature.cc qa_logger.cc + qa_host_buffer.cc qa_vmcircbuf.cc ) list(APPEND GR_TEST_TARGET_DEPS gnuradio-runtime gnuradio-pmt) diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc index bb6ce95298..75f9dc6ae4 100644 --- a/gnuradio-runtime/lib/block.cc +++ b/gnuradio-runtime/lib/block.cc @@ -384,7 +384,8 @@ void block::set_min_output_buffer(int port, long min_output_buffer) void block::allocate_detail(int ninputs, int noutputs, const std::vector<int>& downstream_max_nitems_vec, - const std::vector<uint64_t>& downstream_lcm_nitems_vec) + const std::vector<uint64_t>& downstream_lcm_nitems_vec, + const std::vector<uint32_t>& downstream_max_out_mult_vec) { block_detail_sptr detail = make_block_detail(ninputs, noutputs); @@ -393,8 +394,10 @@ void block::allocate_detail(int ninputs, for (int i = 0; i < noutputs; i++) { expand_minmax_buffer(i); - buffer_sptr buffer = allocate_buffer( - i, downstream_max_nitems_vec[i], downstream_lcm_nitems_vec[i]); + buffer_sptr buffer = allocate_buffer(i, + downstream_max_nitems_vec[i], + downstream_lcm_nitems_vec[i], + downstream_max_out_mult_vec[i]); GR_LOG_DEBUG(d_debug_logger, "Allocated buffer for output " + identifier() + " " + std::to_string(i)); @@ -413,19 +416,24 @@ void block::allocate_detail(int ninputs, set_detail(detail); } -buffer_sptr block::replace_buffer(uint32_t out_port, block_sptr block_owner) +buffer_sptr +block::replace_buffer(uint32_t src_port, uint32_t dst_port, block_sptr block_owner) { block_detail_sptr detail_ = detail(); - buffer_sptr orig_buffer = detail_->output(out_port); + buffer_sptr orig_buffer = detail_->output(src_port); - // Make a new buffer but this time use the passed in block as the owner - buffer_sptr new_buffer = make_buffer(orig_buffer->bufsize(), - orig_buffer->get_sizeof_item(), - orig_buffer->get_downstream_lcm_nitems(), - shared_from_base<block>(), - block_owner); + buffer_type buftype = block_owner->output_signature()->stream_buffer_type(dst_port); - detail_->set_output(out_port, new_buffer); + // Make a new buffer but this time use the passed in block as the owner + buffer_sptr new_buffer = + buftype.make_buffer(orig_buffer->bufsize(), + orig_buffer->get_sizeof_item(), + orig_buffer->get_downstream_lcm_nitems(), + orig_buffer->get_max_reader_output_multiple(), + shared_from_base<block>(), + block_owner); + + detail_->set_output(src_port, new_buffer); return new_buffer; } @@ -435,7 +443,8 @@ void block::enable_update_rate(bool en) { d_update_rate = en; } buffer_sptr block::allocate_buffer(int port, int downstream_max_nitems, - uint64_t downstream_lcm_nitems) + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult) { int item_size = output_signature()->sizeof_stream_item(port); @@ -473,14 +482,16 @@ buffer_sptr block::allocate_buffer(int port, buffer_sptr buf; #ifdef BUFFER_DEBUG - // BUFFER DEBUG GR_LOG_DEBUG(d_logger, "Block: " + name() + " allocated buffer for output " + identifier()); #endif + // Grab the buffer type associated with the output port and use it to + // create the specified type of buffer + buffer_type buftype = output_signature()->stream_buffer_type(port); + try { #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "downstream_max_nitems: " << downstream_max_nitems << " -- downstream_lcm_nitems: " << downstream_lcm_nitems @@ -498,18 +509,20 @@ buffer_sptr block::allocate_buffer(int port, } GR_LOG_DEBUG(d_logger, msg.str()); #endif - buf = make_buffer(nitems, - item_size, - downstream_lcm_nitems, - shared_from_base<block>(), - shared_from_base<block>()); + buf = buftype.make_buffer(nitems, + item_size, + downstream_lcm_nitems, + downstream_max_out_mult, + shared_from_base<block>(), + shared_from_base<block>()); } catch (std::bad_alloc&) { - buf = make_buffer(nitems, - item_size, - downstream_lcm_nitems, - shared_from_base<block>(), - shared_from_base<block>()); + buf = buftype.make_buffer(nitems, + item_size, + downstream_lcm_nitems, + downstream_max_out_mult, + shared_from_base<block>(), + shared_from_base<block>()); } // Set the max noutput items size here to make sure it's always diff --git a/gnuradio-runtime/lib/block_detail.cc b/gnuradio-runtime/lib/block_detail.cc index f5283c56b0..66b32ec078 100644 --- a/gnuradio-runtime/lib/block_detail.cc +++ b/gnuradio-runtime/lib/block_detail.cc @@ -114,6 +114,7 @@ void block_detail::consume_each(int how_many_items) void block_detail::produce(int which_output, int how_many_items) { if (how_many_items > 0) { + d_output[which_output]->post_work(how_many_items); d_output[which_output]->update_write_pointer(how_many_items); d_produce_or |= how_many_items; } @@ -123,6 +124,7 @@ void block_detail::produce_each(int how_many_items) { if (how_many_items > 0) { for (int i = 0; i < noutputs(); i++) { + d_output[i]->post_work(how_many_items); d_output[i]->update_write_pointer(how_many_items); } d_produce_or |= how_many_items; diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc index 6fbf2c5c17..ea59196571 100644 --- a/gnuradio-runtime/lib/block_executor.cc +++ b/gnuradio-runtime/lib/block_executor.cc @@ -59,9 +59,11 @@ static int min_available_space(block* m, int min_noutput_items, int& output_idx) { +#if ENABLE_LOGGING gr::logger_ptr logger; gr::logger_ptr debug_logger; gr::configure_default_loggers(logger, debug_logger, "min_available_space"); +#endif int min_space = std::numeric_limits<int>::max(); if (min_noutput_items == 0) @@ -285,7 +287,7 @@ block_executor::state block_executor::run_one_iteration() // determine the minimum available output space output_idx = 0; - out_try_again: + blkd_out_try_again: noutput_items = min_available_space( m, d, m->output_multiple(), m->min_noutput_items(), output_idx); noutput_items = std::min(noutput_items, max_noutput_items); @@ -312,7 +314,7 @@ block_executor::state block_executor::run_one_iteration() msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx << ")"; GR_LOG_INFO(d_debug_logger, msg.str());); - goto out_try_again; + goto blkd_out_try_again; } } else { return BLKD_OUT; @@ -329,7 +331,6 @@ block_executor::state block_executor::run_one_iteration() d_input_done.resize(d->ninputs()); d_output_items.resize(0); d_start_nitems_read.resize(d->ninputs()); - // LOG(GR_LOG_INFO(d_debug_logger, "sink");); LOG(std::ostringstream msg; msg << m << " -- sink"; GR_LOG_INFO(d_debug_logger, msg.str());); @@ -367,7 +368,6 @@ block_executor::state block_executor::run_one_iteration() GR_LOG_INFO(d_debug_logger, msg.str());); if (noutput_items == 0) { // we're blocked on input - // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; GR_LOG_INFO(d_debug_logger, msg.str())); return BLKD_IN; @@ -400,7 +400,7 @@ block_executor::state block_executor::run_one_iteration() // determine the minimum available output space output_idx = 0; - out_try_again2: + blkd_out_try_again2: noutput_items = min_available_space( m, d, m->output_multiple(), m->min_noutput_items(), output_idx); if (ENABLE_LOGGING) { @@ -415,7 +415,6 @@ block_executor::state block_executor::run_one_iteration() goto were_done; if (noutput_items == 0) { // we're output blocked - // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT"; GR_LOG_INFO(d_debug_logger, msg.str())); @@ -435,7 +434,7 @@ block_executor::state block_executor::run_one_iteration() msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx << ")"; GR_LOG_INFO(d_debug_logger, msg.str());); - goto out_try_again2; + goto blkd_out_try_again2; } } else { return BLKD_OUT; @@ -532,10 +531,6 @@ block_executor::state block_executor::run_one_iteration() buffer_reader_sptr in_buf = d->input(i); - LOG(std::ostringstream msg; - msg << m << " (t: " << this << ") -- pre-callback"; - GR_LOG_DEBUG(d_debug_logger, msg.str())); - if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) { gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer()); if (in_buf->input_blocked_callback(d_ninput_items_required[i], @@ -681,8 +676,8 @@ block_executor::state block_executor::run_one_iteration() GR_LOG_DEBUG(d_debug_logger, msg.str());); gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); out_buf->output_blocked_callback(m->output_multiple(), true); - LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i - << "] -- OUTPUT BLOCKED CBACK: " << rc; + LOG(std::ostringstream msg; + msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED CBACK "; GR_LOG_DEBUG(d_debug_logger, msg.str());); } @@ -692,7 +687,6 @@ block_executor::state block_executor::run_one_iteration() GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine"); were_done: - // LOG(GR_LOG_INFO(d_debug_logger, "we're done");); LOG(std::ostringstream msg; msg << m << " -- we're done"; GR_LOG_INFO(d_debug_logger, msg.str())); d->set_done(true); diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc index 7fd0a39579..75fc447e8d 100644 --- a/gnuradio-runtime/lib/buffer.cc +++ b/gnuradio-runtime/lib/buffer.cc @@ -12,6 +12,7 @@ #include "config.h" #endif #include "vmcircbuf.h" +#include <gnuradio/block.h> #include <gnuradio/buffer.h> #include <gnuradio/buffer_double_mapped.h> #include <gnuradio/buffer_reader.h> @@ -56,10 +57,11 @@ static long s_buffer_count = 0; // counts for debugging storage mgmt ---------------------------------------------------------------------------- */ -buffer::buffer(BufferMappingType buf_type, +buffer::buffer(buffer_mapping_type buf_type, int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link) : d_base(0), d_bufsize(0), @@ -76,7 +78,9 @@ buffer::buffer(BufferMappingType buf_type, d_callback_flag(false), d_active_pointer_counter(0), d_downstream_lcm_nitems(downstream_lcm_nitems), - d_write_multiple(0) + d_write_multiple(0), + d_max_reader_output_multiple(downstream_max_out_mult), + d_context(buffer_context::DEFAULT_INVALID) { gr::configure_default_loggers(d_logger, d_debug_logger, "buffer"); @@ -86,44 +90,27 @@ buffer::buffer(BufferMappingType buf_type, buffer_sptr make_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link, block_sptr buf_owner) { #ifdef BUFFER_DEBUG - // BUFFER DEBUG gr::logger_ptr logger; gr::logger_ptr debug_logger; gr::configure_default_loggers(logger, debug_logger, "make_buffer"); std::ostringstream msg; #endif -#if DEBUG_SINGLE_MAPPED - if (1) { -#else - if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) { -#endif - // Buffer type is NOT the default non custom variety so allocate a - // buffer_single_mapped instance -#ifdef BUFFER_DEBUG - msg << "buffer_single_mapped nitems: " << nitems - << " -- sizeof_item: " << sizeof_item; - GR_LOG_DEBUG(logger, msg.str()); -#endif - - return buffer_sptr(new buffer_single_mapped( - nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner)); - - } else { - // Default to allocating a buffer_double_mapped instance -#ifdef BUFFER_DEBUG - msg << "buffer_double_mapped nitems: " << nitems - << " -- sizeof_item: " << sizeof_item; - GR_LOG_DEBUG(logger, msg.str()); -#endif - - return buffer_sptr( - new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); - } + // NOTE: This function is no longer called by flat_flowgraph functions and + // therefore is somewhat deprecated. It will create and return a + // buffer_double_mapped subclass by default. + buffer_type buftype = buffer_double_mapped::type; + return buftype.make_buffer(nitems, + sizeof_item, + downstream_lcm_nitems, + downstream_max_out_mult, + link, + buf_owner); } buffer::~buffer() @@ -134,12 +121,16 @@ buffer::~buffer() void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; } +const void* buffer::_read_pointer(unsigned int read_index) +{ + return &d_base[read_index * d_sizeof_item]; +} + void buffer::update_write_pointer(int nitems) { gr::thread::scoped_lock guard(*mutex()); #ifdef BUFFER_DEBUG - // BUFFER DEBUG unsigned orig_wr_idx = d_write_index; #endif @@ -147,7 +138,6 @@ void buffer::update_write_pointer(int nitems) d_abs_write_offset += nitems; #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index; @@ -266,4 +256,21 @@ std::ostream& operator<<(std::ostream& os, const buffer& buf) return os; } +void buffer::set_context(const buffer_context& context) +{ + if ((d_context == buffer_context::DEFAULT_INVALID) || (d_context == context)) { + // Set the context if the existing value is the default or if it is the + // same as what's already been set + d_context = context; + } else { + // Otherwise error out as the context value cannot be changed after + // it is set + std::ostringstream msg; + msg << "Block: " << link()->identifier() << " has context " << d_context + << " assigned. Cannot change to context " << context << "."; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } +} + } /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_context.cc b/gnuradio-runtime/lib/buffer_context.cc new file mode 100644 index 0000000000..9cd27add36 --- /dev/null +++ b/gnuradio-runtime/lib/buffer_context.cc @@ -0,0 +1,32 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 BlackLynx, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ +#include <gnuradio/buffer_context.h> + +namespace gr { + +std::ostream& operator<<(std::ostream& os, const buffer_context& context) +{ + switch (context) { + case buffer_context::DEFAULT_INVALID: + return os << "DEFAULT_INVALID"; + case buffer_context::HOST_TO_DEVICE: + return os << "HOST_TO_DEVICE"; + case buffer_context::DEVICE_TO_HOST: + return os << "DEVICE_TO_HOST"; + case buffer_context::HOST_TO_HOST: + return os << "HOST_TO_HOST"; + case buffer_context::DEVICE_TO_DEVICE: + return os << "DEVICE_TO_DEVICE"; + default: + return os << "Unknown buffer context: " << static_cast<int>(context); + } +} + +} // namespace gr diff --git a/gnuradio-runtime/lib/buffer_double_mapped.cc b/gnuradio-runtime/lib/buffer_double_mapped.cc index ad99c2162b..700ebac0c1 100644 --- a/gnuradio-runtime/lib/buffer_double_mapped.cc +++ b/gnuradio-runtime/lib/buffer_double_mapped.cc @@ -36,23 +36,25 @@ static inline long minimum_buffer_items(long type_size, long page_size) return page_size / GR_GCD(type_size, page_size); } +buffer_type buffer_double_mapped::type(buftype_DEFAULT_NON_CUSTOM{}); buffer_double_mapped::buffer_double_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link) - : buffer(BufferMappingType::DoubleMapped, + : buffer(buffer_mapping_type::double_mapped, nitems, sizeof_item, downstream_lcm_nitems, + downstream_max_out_mult, link) { gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_double_mapped"); - if (!allocate_buffer(nitems, sizeof_item)) + if (!allocate_buffer(nitems)) throw std::bad_alloc(); #ifdef BUFFER_DEBUG - // BUFFER DEBUG { std::ostringstream msg; msg << "[" << this << "] " @@ -62,13 +64,18 @@ buffer_double_mapped::buffer_double_mapped(int nitems, #endif } +// NB: Added the extra 'block_sptr unused' parameter so that the +// call signature matches the other factory-like functions used to create +// the buffer_single_mapped subclasses buffer_sptr make_buffer_double_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, - block_sptr link) + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr unused) { - return buffer_sptr( - new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); + return buffer_sptr(new buffer_double_mapped( + nitems, sizeof_item, downstream_lcm_nitems, downstream_max_out_mult, link)); } buffer_double_mapped::~buffer_double_mapped() {} @@ -77,13 +84,13 @@ buffer_double_mapped::~buffer_double_mapped() {} * sets d_vmcircbuf, d_base, d_bufsize. * returns true iff successful. */ -bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item) +bool buffer_double_mapped::allocate_buffer(int nitems) { int orig_nitems = nitems; // Any buffer size we come up with must be a multiple of min_nitems. int granularity = gr::vmcircbuf_sysconfig::granularity(); - int min_nitems = minimum_buffer_items(sizeof_item, granularity); + int min_nitems = minimum_buffer_items(d_sizeof_item, granularity); // Round-up nitems to a multiple of min_nitems. if (nitems % min_nitems != 0) @@ -91,7 +98,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item) // If we rounded-up a whole bunch, give the user a heads up. // This only happens if sizeof_item is not a power of two. - if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) { + if (nitems > 2 * orig_nitems && nitems * (int)d_sizeof_item > granularity) { auto msg = str(boost::format( "allocate_buffer: tried to allocate" @@ -99,7 +106,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item) " %d were allocated. If this isn't OK, consider padding" " your structure to a power-of-two bytes." " On this platform, our allocation granularity is %d bytes.") % - orig_nitems % sizeof_item % nitems % granularity); + orig_nitems % d_sizeof_item % nitems % granularity); GR_LOG_WARN(d_logger, msg.c_str()); } @@ -138,7 +145,6 @@ int buffer_double_mapped::space_available() } #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << this << "] " << "space_available() called d_write_index: " << d_write_index diff --git a/gnuradio-runtime/lib/buffer_reader.cc b/gnuradio-runtime/lib/buffer_reader.cc index 7ead53032e..b1c1e7fb73 100644 --- a/gnuradio-runtime/lib/buffer_reader.cc +++ b/gnuradio-runtime/lib/buffer_reader.cc @@ -34,11 +34,11 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay buffer_reader_sptr r; - if (buf->get_mapping_type() == BufferMappingType::DoubleMapped) { + if (buf->get_mapping_type() == buffer_mapping_type::double_mapped) { r.reset(new buffer_reader( buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); r->declare_sample_delay(delay); - } else if (buf->get_mapping_type() == BufferMappingType::SingleMapped) { + } else if (buf->get_mapping_type() == buffer_mapping_type::single_mapped) { r.reset(new buffer_reader_sm( buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); r->declare_sample_delay(delay); @@ -51,11 +51,15 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay buf->d_readers.push_back(r.get()); #ifdef BUFFER_DEBUG - // BUFFER DEBUG - std::cerr << " [" << buf.get() << ";" << r.get() - << "] buffer_add_reader() nzero_preload " << nzero_preload - << " -- delay: " << delay << " -- history: " << link->history() - << " -- RD_idx: " << r->d_read_index << std::endl; + gr::logger_ptr logger; + gr::logger_ptr debug_logger; + gr::configure_default_loggers(logger, debug_logger, "buffer_add_reader"); + + std::ostringstream msg; + msg << " [" << buf.get() << ";" << r.get() << "] buffer_add_reader() nzero_preload " + << nzero_preload << " -- delay: " << delay << " -- history: " << link->history() + << " -- RD_idx: " << r->d_read_index; + GR_LOG_DEBUG(debug_logger, msg.str()); #endif return r; @@ -89,12 +93,11 @@ void buffer_reader::declare_sample_delay(unsigned delay) unsigned buffer_reader::sample_delay() const { return d_attr_delay; } -int buffer_reader::items_available() // const +int buffer_reader::items_available() const { int available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index); #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << d_buffer << ";" << this << "] " << "items_available() WR_idx: " << d_buffer->d_write_index @@ -109,7 +112,8 @@ int buffer_reader::items_available() // const const void* buffer_reader::read_pointer() { - return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item]; + // Delegate to buffer subclass + return d_buffer->_read_pointer(d_read_index); } void buffer_reader::update_read_pointer(int nitems) @@ -117,7 +121,6 @@ void buffer_reader::update_read_pointer(int nitems) gr::thread::scoped_lock guard(*mutex()); #ifdef BUFFER_DEBUG - // BUFFER DEBUG unsigned orig_rd_idx = d_read_index; #endif @@ -125,7 +128,6 @@ void buffer_reader::update_read_pointer(int nitems) d_abs_read_offset += nitems; #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << d_buffer << ";" << this << "] update_read_pointer -- orig d_read_index: " << orig_rd_idx diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc index 9e0fac584f..c0d4b07d65 100644 --- a/gnuradio-runtime/lib/buffer_reader_sm.cc +++ b/gnuradio-runtime/lib/buffer_reader_sm.cc @@ -26,13 +26,14 @@ namespace gr { buffer_reader_sm::~buffer_reader_sm() {} -int buffer_reader_sm::items_available() +int buffer_reader_sm::items_available() const { int available = 0; if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) { if (d_buffer->d_write_index == d_read_index) { - if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) { + if ((nitems_read() - sample_delay()) != + (d_buffer->nitems_written() + link()->history() - 1)) { available = d_buffer->d_bufsize - d_read_index; } } else { @@ -55,87 +56,15 @@ int buffer_reader_sm::items_available() bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const { - gr::thread::scoped_lock(*d_buffer->mutex()); - - return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) && - (d_buffer->d_write_index < d_read_index)); + return d_buffer->input_blkd_cb_ready(items_required, d_read_index); } bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail) { - // Maybe adjust read pointers from min read index? - // This would mean that *all* readers must be > (passed) the write index - if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) && - (d_buffer->d_write_index < d_read_index)) { - - // Update items available before going farther as it could be stale - items_avail = items_available(); - - // Find reader with the smallest read index that is greater than the - // write index - uint32_t min_reader_index = std::numeric_limits<uint32_t>::max(); - uint32_t min_read_idx = std::numeric_limits<uint32_t>::max(); - for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { - if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) { - // Record index of reader with minimum read-index - if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) { - min_read_idx = d_buffer->d_readers[idx]->d_read_index; - min_reader_index = idx; - } - } - } - - // Note items_avail might be zero, that's okay. - items_avail += d_read_index - min_read_idx; - int gap = min_read_idx - d_buffer->d_write_index; - if (items_avail > gap) { - return false; - } - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << d_buffer << ";" << this << "] " - << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index - << " -- WR items: " << d_buffer->nitems_written() - << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx; - for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { - if (idx != min_reader_index) { - msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index; - } - } - - msg << " -- GAP: " << gap << " -- items_required: " << items_required - << " -- items_avail: " << items_avail; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - // Shift existing data down to make room for blocked data at end of buffer - uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item; - char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item); - std::memmove(dest, d_buffer->d_base, move_data_size); - - // Next copy the data from the end of the buffer back to the beginning - uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item; - char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item); - std::memcpy(d_buffer->d_base, src, avail_data_size); - - // Now adjust write pointer - d_buffer->d_write_index += items_avail; - - // Finally adjust all reader pointers - for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { - if (idx == min_reader_index) { - d_buffer->d_readers[idx]->d_read_index = 0; - } else { - d_buffer->d_readers[idx]->d_read_index += items_avail; - d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize; - } - } - - return true; - } + // Update items available before going farther as it could be stale + items_avail = items_available(); - return false; + return d_buffer->input_blocked_callback(items_required, items_avail, d_read_index); } buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer, diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc index 6024396557..db7959c5b4 100644 --- a/gnuradio-runtime/lib/buffer_single_mapped.cc +++ b/gnuradio-runtime/lib/buffer_single_mapped.cc @@ -19,6 +19,8 @@ #include <gnuradio/thread/thread.h> #include <assert.h> #include <algorithm> +#include <cstdlib> +#include <cstring> #include <iostream> #include <stdexcept> @@ -27,30 +29,18 @@ namespace gr { buffer_single_mapped::buffer_single_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link, block_sptr buf_owner) - : buffer(BufferMappingType::SingleMapped, + : buffer(buffer_mapping_type::single_mapped, nitems, sizeof_item, downstream_lcm_nitems, + downstream_max_out_mult, link), d_buf_owner(buf_owner), - d_buffer(nullptr, - std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1)) + d_buffer(nullptr) { - gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped"); - if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems)) - throw std::bad_alloc(); - -#ifdef BUFFER_DEBUG - // BUFFER DEBUG - { - std::ostringstream msg; - msg << "[" << this << "] " - << "buffer_single_mapped constructor -- history: " << link->history(); - GR_LOG_DEBUG(d_logger, msg.str()); - } -#endif } buffer_single_mapped::~buffer_single_mapped() {} @@ -59,14 +49,21 @@ buffer_single_mapped::~buffer_single_mapped() {} * Allocates underlying buffer. * returns true iff successful. */ -bool buffer_single_mapped::allocate_buffer(int nitems, - size_t sizeof_item, - uint64_t downstream_lcm_nitems) +bool buffer_single_mapped::allocate_buffer(int nitems) { #ifdef BUFFER_DEBUG int orig_nitems = nitems; #endif + // For single mapped buffers resize the initial size to be at least four + // times the size of the largest of any downstream block's output multiple. + // This helps reduce the number of times the input_block_callback() might be + // called which should help overall performance (particularly if the max + // output multiple is large) at the cost of slightly more buffer space. + if (static_cast<uint32_t>(nitems) < (4 * d_max_reader_output_multiple)) { + nitems = 4 * d_max_reader_output_multiple; + } + // Unlike the double mapped buffer case that can easily wrap back onto itself // for both reads and writes the single mapped case needs to be aware of read // and write granularity and size the underlying buffer accordingly. Otherwise @@ -105,33 +102,37 @@ bool buffer_single_mapped::allocate_buffer(int nitems, #endif // Adjust size so output buffer size is a multiple of the write granularity - if (write_granularity != 1 || downstream_lcm_nitems != 1) { - uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems); + if (write_granularity != 1 || d_downstream_lcm_nitems != 1) { + uint64_t size_align_adjust = GR_LCM(write_granularity, d_downstream_lcm_nitems); uint64_t remainder = nitems % size_align_adjust; nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0; #ifdef BUFFER_DEBUG std::ostringstream msg; msg << "allocate_buffer()** called nitems: " << orig_nitems - << " -- read_multiple: " << downstream_lcm_nitems + << " -- read_multiple: " << d_downstream_lcm_nitems << " -- write_multiple: " << write_granularity << " -- NEW nitems: " << nitems; GR_LOG_DEBUG(d_logger, msg.str()); #endif } - // Allocate a new custom buffer from the owning block - char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item); - assert(buf != nullptr); - d_buffer.reset(buf); - - d_base = d_buffer.get(); d_bufsize = nitems; - d_downstream_lcm_nitems = downstream_lcm_nitems; + d_downstream_lcm_nitems = d_downstream_lcm_nitems; d_write_multiple = write_granularity; - return true; + // Do the actual allocation(s) with the finalized nitems + return do_allocate_buffer(nitems, d_sizeof_item); +} + +bool buffer_single_mapped::input_blkd_cb_ready(int items_required, + unsigned int read_index) +{ + gr::thread::scoped_lock(*this->mutex()); + + return (((d_bufsize - read_index) < (uint32_t)items_required) && + (d_write_index < read_index)); } bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple) @@ -145,70 +146,6 @@ bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple) ((space_avail / output_multiple) * output_multiple == 0)); } - -bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force) -{ - uint32_t space_avail = space_available(); - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << this << "] " - << "output_blocked_callback()*** WR_idx: " << d_write_index - << " -- WR items: " << nitems_written() - << " -- output_multiple: " << output_multiple - << " -- space_avail: " << space_avail << " -- force: " << force; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) || - force) { - // Find reader with the smallest read index - uint32_t min_read_idx = d_readers[0]->d_read_index; - for (size_t idx = 1; idx < d_readers.size(); ++idx) { - // Record index of reader with minimum read-index - if (d_readers[idx]->d_read_index < min_read_idx) { - min_read_idx = d_readers[idx]->d_read_index; - } - } - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << this << "] " - << "output_blocked_callback() WR_idx: " << d_write_index - << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx - << " -- shortcircuit: " - << ((min_read_idx == 0) || (min_read_idx >= d_write_index)) - << " -- to_move_items: " << (d_write_index - min_read_idx) - << " -- space_avail: " << space_avail << " -- force: " << force; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - // Make sure we have enough room to start writing back at the beginning - if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) { - return false; - } - - // Determine how much "to be read" data needs to be moved - int to_move_items = d_write_index - min_read_idx; - assert(to_move_items > 0); - uint32_t to_move_bytes = to_move_items * d_sizeof_item; - - // Shift "to be read" data back to the beginning of the buffer - std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes); - - // Adjust write index and each reader index - d_write_index -= min_read_idx; - - for (size_t idx = 0; idx < d_readers.size(); ++idx) { - d_readers[idx]->d_read_index -= min_read_idx; - } - - return true; - } - - return false; -} - int buffer_single_mapped::space_available() { if (d_readers.empty()) @@ -233,7 +170,7 @@ int buffer_single_mapped::space_available() // For single mapped buffer there is no wrapping beyond the end of the // buffer #ifdef BUFFER_DEBUG - int thecase = 4; // REMOVE ME - just for debug + int thecase = 0; #endif int space = d_bufsize - d_write_index; @@ -259,15 +196,17 @@ int buffer_single_mapped::space_available() #endif space = min_read_index - d_write_index; // Leave extra space in case the reader gets stuck and needs realignment - { - if ((d_write_index > (d_bufsize / 2)) || - (min_read_index < (d_bufsize / 2))) { + if (d_max_reader_output_multiple > 1) { + if (static_cast<uint32_t>(space) > d_max_reader_output_multiple) { #ifdef BUFFER_DEBUG - thecase = 17; + thecase = 4; #endif - space = 0; + space = space - d_max_reader_output_multiple; } else { - space = (d_bufsize / 2) - d_write_index; +#ifdef BUFFER_DEBUG + thecase = 5; +#endif + space = 0; } } } @@ -278,7 +217,6 @@ int buffer_single_mapped::space_available() } #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << this << "] " << "space_available() called (case: " << thecase @@ -286,6 +224,7 @@ int buffer_single_mapped::space_available() << " -- min_read_index: " << min_read_index << " (" << min_idx_reader->nitems_read() << ") " << " -- space: " << space + << " -- max_reader_out_mult: " << d_max_reader_output_multiple << " (sample delay: " << min_idx_reader->sample_delay() << ")"; GR_LOG_DEBUG(d_logger, msg.str()); #endif @@ -294,4 +233,198 @@ int buffer_single_mapped::space_available() } } +void buffer_single_mapped::update_reader_block_history(unsigned history, int delay) +{ + unsigned old_max = d_max_reader_history; + d_max_reader_history = std::max(d_max_reader_history, history); + if (d_max_reader_history != old_max) { + d_write_index = d_max_reader_history - 1; + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "buffer_single_mapped constructor -- set wr index to: " << d_write_index; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Reset the reader's read index if the buffer's write index has changed. + // Note that "history - 1" is the nzero_preload value passed to + // buffer_add_reader. + for (auto reader : d_readers) { + reader->d_read_index = d_write_index - (reader->link()->history() - 1); + } + } + + // Only attempt to set has history flag if it is not already set + if (!d_has_history) { + // Blocks that set delay may set history to delay + 1 but this is + // not "real" history + d_has_history = ((static_cast<int>(history) - 1) != delay); + } +} + +//------------------------------------------------------------------------------ + +bool buffer_single_mapped::input_blocked_callback_logic(int items_required, + int items_avail, + unsigned read_index, + char* buffer_ptr, + memcpy_func_t memcpy_func, + memmove_func_t memmove_func) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "input_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize + << " -- RD_idx: " << read_index << " -- items_required: " << items_required + << " -- items_avail: " << items_avail; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Maybe adjust read pointers from min read index? + // This would mean that *all* readers must be > (passed) the write index + if (((d_bufsize - read_index) < (uint32_t)items_required) && + (d_write_index < read_index)) { + + // Find reader with the smallest read index that is greater than the + // write index + uint32_t min_reader_index = std::numeric_limits<uint32_t>::max(); + uint32_t min_read_idx = std::numeric_limits<uint32_t>::max(); + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (d_readers[idx]->d_read_index > d_write_index) { + // Record index of reader with minimum read-index + if (d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_readers[idx]->d_read_index; + min_reader_index = idx; + } + } + } + + // Note items_avail might be zero, that's okay. + items_avail += read_index - min_read_idx; + int gap = min_read_idx - d_write_index; + if (items_avail > gap) { + return false; + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "input_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize + << " -- RD_idx: " << min_read_idx; + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (idx != min_reader_index) { + msg << " -- OTHER_RDR: " << d_readers[idx]->d_read_index; + } + } + + msg << " -- GAP: " << gap << " -- items_required: " << items_required + << " -- items_avail: " << items_avail; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Shift existing data down to make room for blocked data at end of buffer + uint32_t move_data_size = d_write_index * d_sizeof_item; + char* dest = buffer_ptr + (items_avail * d_sizeof_item); + memmove_func(dest, buffer_ptr, move_data_size); + + // Next copy the data from the end of the buffer back to the beginning + uint32_t avail_data_size = items_avail * d_sizeof_item; + char* src = buffer_ptr + (min_read_idx * d_sizeof_item); + memcpy_func(buffer_ptr, src, avail_data_size); + + // Now adjust write pointer + d_write_index += items_avail; + + // Finally adjust all reader pointers + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (idx == min_reader_index) { + d_readers[idx]->d_read_index = 0; + } else { + d_readers[idx]->d_read_index += items_avail; + d_readers[idx]->d_read_index %= d_bufsize; + } + } + + return true; + } + + return false; +} + +bool buffer_single_mapped::output_blocked_callback_logic(int output_multiple, + bool force, + char* buffer_ptr, + memmove_func_t memmove_func) +{ + uint32_t space_avail = space_available(); + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback()*** WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() + << " -- output_multiple: " << output_multiple + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) || + force) { + // Find reader with the smallest read index + uint32_t min_read_idx = d_readers[0]->d_read_index; + uint64_t min_read_idx_nitems = d_readers[0]->nitems_read(); + for (size_t idx = 1; idx < d_readers.size(); ++idx) { + // Record index of reader with minimum read-index + if (d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_readers[idx]->d_read_index; + min_read_idx_nitems = d_readers[idx]->nitems_read(); + } + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx + << " -- RD items: " << min_read_idx_nitems << " -- shortcircuit: " + << ((min_read_idx == 0) || (min_read_idx > d_write_index) || + (min_read_idx == d_write_index && + min_read_idx_nitems != nitems_written())) + << " -- to_move_items: " << (d_write_index - min_read_idx) + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Make sure we have enough room to start writing back at the beginning + if ((min_read_idx == 0) || (min_read_idx > d_write_index) || + (min_read_idx == d_write_index && min_read_idx_nitems != nitems_written())) { + return false; + } + + // Determine how much "to be read" data needs to be moved + int to_move_items = d_write_index - min_read_idx; + if (to_move_items > 0) { + uint32_t to_move_bytes = to_move_items * d_sizeof_item; + + // Shift "to be read" data back to the beginning of the buffer + memmove_func( + buffer_ptr, buffer_ptr + (min_read_idx * d_sizeof_item), to_move_bytes); + } + + // Adjust write index and each reader index + d_write_index -= min_read_idx; + + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + d_readers[idx]->d_read_index -= min_read_idx; + } + + return true; + } + + return false; +} + } /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_type.cc b/gnuradio-runtime/lib/buffer_type.cc deleted file mode 100644 index b667eded1e..0000000000 --- a/gnuradio-runtime/lib/buffer_type.cc +++ /dev/null @@ -1,18 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2020 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ -#include <gnuradio/buffer_type.h> - -namespace gr { - -uint32_t buffer_type_base::s_nextId = 0; -std::mutex buffer_type_base::s_mutex; - - -} /* namespace gr */
\ No newline at end of file diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc index b9986f8370..ef96ef21ac 100644 --- a/gnuradio-runtime/lib/flat_flowgraph.cc +++ b/gnuradio-runtime/lib/flat_flowgraph.cc @@ -15,6 +15,7 @@ #include "flat_flowgraph.h" #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_double_mapped.h> #include <gnuradio/buffer_reader.h> #include <gnuradio/buffer_type.h> #include <gnuradio/integer_math.h> @@ -87,6 +88,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) // Determine the downstream max per output port std::vector<int> downstream_max_nitems(noutputs, 0); std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1); + std::vector<uint32_t> downstream_max_out_mult(noutputs, 1); #ifdef BUFFER_DEBUG std::ostringstream msg; @@ -96,6 +98,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) for (int i = 0; i < noutputs; i++) { int nitems = 0; uint64_t lcm_nitems = 1; + uint32_t max_out_multiple = 1; basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i); for (basic_block_viter_t blk = downstream_blocks.begin(); blk != downstream_blocks.end(); @@ -116,8 +119,8 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) double decimation = (1.0 / dgrblock->relative_rate()); int multiple = dgrblock->output_multiple(); int history = dgrblock->history(); - nitems = - std::max(nitems, static_cast<int>(2 * (decimation * multiple + history))); + nitems = std::max( + nitems, static_cast<int>(2 * (decimation * multiple + (history - 1)))); // Calculate the LCM of downstream reader nitems #ifdef BUFFER_DEBUG @@ -131,6 +134,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) (uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) - (dgrblock->history() - 1))); } + if (dgrblock->relative_rate() != 1.0) { // Relative rate lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d()); @@ -141,6 +145,10 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) lcm_nitems = 1; } + if (static_cast<uint32_t>(multiple) > max_out_multiple) { + max_out_multiple = multiple; + } + #ifdef BUFFER_DEBUG msg.str(""); msg << " NINPUT_ITEMS: " << nitems; @@ -161,11 +169,15 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) } downstream_max_nitems[i] = nitems; downstream_lcm_nitems[i] = lcm_nitems; + downstream_max_out_mult[i] = max_out_multiple; } // Allocate the block detail and necessary buffers - grblock->allocate_detail( - ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems); + grblock->allocate_detail(ninputs, + noutputs, + downstream_max_nitems, + downstream_lcm_nitems, + downstream_max_out_mult); } void flat_flowgraph::connect_block_inputs(basic_block_sptr block) @@ -189,26 +201,46 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block) if (!src_grblock) throw std::runtime_error("connect_block_inputs found non-gr::block"); + // In order to determine the buffer context, we need to examine both + // the upstream and the downstream buffer_types + buffer_type src_buf_type = + src_grblock->output_signature()->stream_buffer_type(src_port); + buffer_type dest_buf_type = + grblock->input_signature()->stream_buffer_type(dst_port); + + buffer_context context; + if (src_buf_type == buffer_double_mapped::type && + dest_buf_type == buffer_double_mapped::type) { + context = buffer_context::HOST_TO_HOST; + } else if (src_buf_type != buffer_double_mapped::type && + dest_buf_type == buffer_double_mapped::type) { + context = buffer_context::DEVICE_TO_HOST; + } else if (src_buf_type == buffer_double_mapped::type && + dest_buf_type != buffer_double_mapped::type) { + context = buffer_context::HOST_TO_DEVICE; + } else if (src_buf_type != buffer_double_mapped::type && + dest_buf_type != buffer_double_mapped::type) { + context = buffer_context::DEVICE_TO_DEVICE; + } + buffer_sptr src_buffer; - buffer_type_t src_buf_type = src_grblock->get_buffer_type(); - buffer_type_t dest_buf_type = grblock->get_buffer_type(); - if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() || + if (dest_buf_type == buffer_double_mapped::type || dest_buf_type == src_buf_type) { // The block is not using a custom buffer OR the block and the upstream // block both use the same kind of custom buffer src_buffer = src_grblock->detail()->output(src_port); } else { - if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() && - src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) { + if (dest_buf_type != buffer_double_mapped::type && + src_buf_type == buffer_double_mapped::type) { // The block uses a custom buffer but the upstream block does not // therefore the upstream block's buffer can be replaced with the // type of buffer that the block needs std::ostringstream msg; msg << "Block: " << grblock->identifier() - << "replacing upstream block: " << src_grblock->identifier() + << " replacing upstream block: " << src_grblock->identifier() << " buffer with a custom buffer"; - GR_LOG_DEBUG(d_debug_logger, msg.str()); - src_buffer = src_grblock->replace_buffer(src_port, grblock); + GR_LOG_DEBUG(d_logger, msg.str()); + src_buffer = src_grblock->replace_buffer(src_port, dst_port, grblock); } else { // Both the block and upstream block use incompatible buffer types // which is not currently allowed @@ -223,9 +255,13 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block) } } - GR_LOG_DEBUG(d_debug_logger, - "Setting input " + std::to_string(dst_port) + " from edge " + - (*e).identifier()); + // Set buffer's context + src_buffer->set_context(context); + + std::ostringstream msg; + msg << "Setting input " << dst_port << " from edge " << (*e).identifier() + << " context: " << context; + GR_LOG_DEBUG(d_debug_logger, msg.str()); detail->set_input(dst_port, buffer_add_reader(src_buffer, @@ -341,7 +377,6 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg) // Now deal with the fact that the block details might have // changed numbers of inputs and outputs vs. in the old // flowgraph. - block->detail()->reset_nitem_counters(); block->detail()->clear_tags(); } diff --git a/gnuradio-runtime/lib/host_buffer.cc b/gnuradio-runtime/lib/host_buffer.cc new file mode 100644 index 0000000000..6e1a1d1003 --- /dev/null +++ b/gnuradio-runtime/lib/host_buffer.cc @@ -0,0 +1,255 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 BlackLynx Inc.. + * + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +#include <cstring> +#include <sstream> +#include <stdexcept> + +#include <gnuradio/block.h> +#include <gnuradio/host_buffer.h> + +namespace gr { + +buffer_type host_buffer::type(buftype_HOST_BUFFER{}); + +void* host_buffer::device_memcpy(void* dest, const void* src, std::size_t count) +{ + // There is no spoon...er... device so fake it out using regular memcpy + return std::memcpy(dest, src, count); +} + +void* host_buffer::device_memmove(void* dest, const void* src, std::size_t count) +{ + // There is no spoon...er... device so fake it out using regular memmmove + return std::memmove(dest, src, count); +} + + +host_buffer::host_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner) + : buffer_single_mapped(nitems, + sizeof_item, + downstream_lcm_nitems, + downstream_max_out_mult, + link, + buf_owner), + d_device_base(nullptr) +{ + gr::configure_default_loggers(d_logger, d_debug_logger, "host_buffer"); + if (!allocate_buffer(nitems)) + throw std::bad_alloc(); +} + +host_buffer::~host_buffer() {} + +void host_buffer::post_work(int nitems) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "host_buffer [" << d_context << "] -- post_work: " << nitems; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + if (nitems <= 0) { + return; + } + + // NOTE: when this function is called the write pointer has not yet been + // advanced so it can be used directly as the source ptr + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: { + // Copy data from host buffer to device buffer + void* dest_ptr = &d_device_base[d_write_index * d_sizeof_item]; + device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item); + } break; + + case buffer_context::DEVICE_TO_HOST: { + // Copy data from device buffer to host buffer + void* dest_ptr = &d_base[d_write_index * d_sizeof_item]; + device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item); + } break; + + case buffer_context::DEVICE_TO_DEVICE: + // No op + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } +} + +bool host_buffer::do_allocate_buffer(size_t final_nitems, size_t sizeof_item) +{ +#ifdef BUFFER_DEBUG + { + std::ostringstream msg; + msg << "[" << this << "] " + << "host_buffer constructor -- nitems: " << final_nitems; + GR_LOG_DEBUG(d_logger, msg.str()); + } +#endif + + // This is the host buffer + d_buffer.reset(new char[final_nitems * sizeof_item]()); + d_base = d_buffer.get(); + + // This is the simulated device buffer + d_device_buf.reset(new char[final_nitems * sizeof_item]()); + d_device_base = d_device_buf.get(); + + return true; +} + +void* host_buffer::write_pointer() +{ + void* ptr = nullptr; + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: + // Write into host buffer + ptr = &d_base[d_write_index * d_sizeof_item]; + break; + + case buffer_context::DEVICE_TO_HOST: + case buffer_context::DEVICE_TO_DEVICE: + // Write into "device" buffer + ptr = &d_device_base[d_write_index * d_sizeof_item]; + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + + return ptr; +} + +const void* host_buffer::_read_pointer(unsigned int read_index) +{ + void* ptr = nullptr; + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: + case buffer_context::DEVICE_TO_DEVICE: + // Read from "device" buffer + ptr = &d_device_base[read_index * d_sizeof_item]; + break; + + case buffer_context::DEVICE_TO_HOST: + // Read from host buffer + ptr = &d_base[read_index * d_sizeof_item]; + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + + return ptr; +} + +bool host_buffer::input_blocked_callback(int items_required, + int items_avail, + unsigned read_index) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "host_buffer [" << d_context << "] -- input_blocked_callback"; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + bool rc = false; + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: + case buffer_context::DEVICE_TO_DEVICE: + // Adjust "device" buffer + rc = input_blocked_callback_logic(items_required, + items_avail, + read_index, + d_device_base, + host_buffer::device_memcpy, + host_buffer::device_memmove); + break; + + case buffer_context::DEVICE_TO_HOST: + case buffer_context::HOST_TO_HOST: + // Adjust host buffer + rc = input_blocked_callback_logic( + items_required, items_avail, read_index, d_base, std::memcpy, std::memmove); + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + + return rc; +} + +bool host_buffer::output_blocked_callback(int output_multiple, bool force) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "host_buffer [" << d_context << "] -- output_blocked_callback"; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + bool rc = false; + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: + case buffer_context::HOST_TO_HOST: + // Adjust host buffer + rc = output_blocked_callback_logic(output_multiple, force, d_base, std::memmove); + break; + + case buffer_context::DEVICE_TO_HOST: + case buffer_context::DEVICE_TO_DEVICE: + // Adjust "device" buffer + rc = output_blocked_callback_logic( + output_multiple, force, d_device_base, host_buffer::device_memmove); + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + + return rc; +} + +buffer_sptr host_buffer::make_host_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner) +{ + return buffer_sptr(new host_buffer(nitems, + sizeof_item, + downstream_lcm_nitems, + downstream_max_out_mult, + link, + buf_owner)); +} + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/io_signature.cc b/gnuradio-runtime/lib/io_signature.cc index 507268d5ce..e953c7eb5a 100644 --- a/gnuradio-runtime/lib/io_signature.cc +++ b/gnuradio-runtime/lib/io_signature.cc @@ -22,47 +22,69 @@ gr::io_signature::sptr io_signature::makev(int min_streams, int max_streams, const std::vector<int>& sizeof_stream_items) { + gr_vector_buffer_type buftypes(sizeof_stream_items.size(), + buffer_double_mapped::type); return gr::io_signature::sptr( - new io_signature(min_streams, max_streams, sizeof_stream_items)); + new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes)); } -gr::io_signature::sptr -io_signature::make(int min_streams, int max_streams, int sizeof_stream_item) +gr::io_signature::sptr io_signature::makev(int min_streams, + int max_streams, + const std::vector<int>& sizeof_stream_items, + gr_vector_buffer_type buftypes) { - std::vector<int> sizeof_items(1); - sizeof_items[0] = sizeof_stream_item; - return io_signature::makev(min_streams, max_streams, sizeof_items); + return gr::io_signature::sptr( + new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes)); +} + +gr::io_signature::sptr io_signature::make(int min_streams, + int max_streams, + int sizeof_stream_item, + buffer_type buftype) +{ + std::vector<int> sizeof_items{ sizeof_stream_item }; + gr_vector_buffer_type buftypes{ buftype }; + return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes); } gr::io_signature::sptr io_signature::make2(int min_streams, int max_streams, int sizeof_stream_item1, - int sizeof_stream_item2) + int sizeof_stream_item2, + buffer_type buftype1, + buffer_type buftype2) { - std::vector<int> sizeof_items(2); - sizeof_items[0] = sizeof_stream_item1; - sizeof_items[1] = sizeof_stream_item2; - return io_signature::makev(min_streams, max_streams, sizeof_items); + std::vector<int> sizeof_items{ sizeof_stream_item1, sizeof_stream_item2 }; + gr_vector_buffer_type buftypes{ buftype1, buftype2 }; + return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes); } gr::io_signature::sptr io_signature::make3(int min_streams, int max_streams, int sizeof_stream_item1, int sizeof_stream_item2, - int sizeof_stream_item3) + int sizeof_stream_item3, + buffer_type buftype1, + buffer_type buftype2, + buffer_type buftype3) { - std::vector<int> sizeof_items(3); - sizeof_items[0] = sizeof_stream_item1; - sizeof_items[1] = sizeof_stream_item2; - sizeof_items[2] = sizeof_stream_item3; - return io_signature::makev(min_streams, max_streams, sizeof_items); + std::vector<int> sizeof_items{ sizeof_stream_item1, + sizeof_stream_item2, + sizeof_stream_item3 }; + gr_vector_buffer_type buftypes{ buftype1, buftype2, buftype3 }; + return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes); } // ------------------------------------------------------------------------ io_signature::io_signature(int min_streams, int max_streams, - const std::vector<int>& sizeof_stream_items) + const std::vector<int>& sizeof_stream_items, + gr_vector_buffer_type buftypes) + : d_min_streams(min_streams), + d_max_streams(max_streams), + d_sizeof_stream_item(sizeof_stream_items), + d_stream_buffer_type(buftypes) { if (min_streams < 0 || (max_streams != IO_INFINITE && max_streams < min_streams)) throw std::invalid_argument("gr::io_signature(1)"); @@ -75,10 +97,6 @@ io_signature::io_signature(int min_streams, if (max_streams != 0 && sizeof_stream_items[i] < 1) throw std::invalid_argument("gr::io_signature(3)"); } - - d_min_streams = min_streams; - d_max_streams = max_streams; - d_sizeof_stream_item = sizeof_stream_items; } io_signature::~io_signature() {} @@ -97,4 +115,14 @@ std::vector<int> io_signature::sizeof_stream_items() const return d_sizeof_stream_item; } +buffer_type io_signature::stream_buffer_type(size_t index) const +{ + return d_stream_buffer_type[std::min(index, d_stream_buffer_type.size() - 1)]; +} + +gr_vector_buffer_type io_signature::stream_buffer_types() const +{ + return d_stream_buffer_type; +} + } /* namespace gr */ diff --git a/gnuradio-runtime/lib/qa_buffer.cc b/gnuradio-runtime/lib/qa_buffer.cc index cefe548338..1c97fd070d 100644 --- a/gnuradio-runtime/lib/qa_buffer.cc +++ b/gnuradio-runtime/lib/qa_buffer.cc @@ -43,7 +43,7 @@ static void t0_body() int counter = 0; gr::buffer_sptr buf( - gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr())); int last_sa; int sa; @@ -78,7 +78,7 @@ static void t1_body() int read_counter = 0; gr::buffer_sptr buf( - gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr())); gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr())); int sa; @@ -150,7 +150,7 @@ static void t2_body() int nitems = (64 * (1L << 10)) / sizeof(int); // 64K worth of ints gr::buffer_sptr buf( - gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr())); gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr())); int read_counter = 0; @@ -216,7 +216,7 @@ static void t3_body() static const int N = 5; gr::buffer_sptr buf( - gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr())); gr::buffer_reader_sptr reader[N]; int read_counter[N]; int write_counter = 0; diff --git a/gnuradio-runtime/lib/qa_host_buffer.cc b/gnuradio-runtime/lib/qa_host_buffer.cc new file mode 100644 index 0000000000..e8f29afe8d --- /dev/null +++ b/gnuradio-runtime/lib/qa_host_buffer.cc @@ -0,0 +1,334 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 BlackLynx Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <gnuradio/block.h> +#include <gnuradio/buffer_context.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/host_buffer.h> +#include <gnuradio/random.h> +#include <boost/test/unit_test.hpp> +#include <cstdlib> +#include <iostream> + +// This is a trivial mocked up block inspired by gr::blocks::nop that is used +// only as a placeholder for testing host_buffer below. +class nop : public gr::block +{ +public: + typedef std::shared_ptr<nop> sptr; + static sptr make(size_t sizeof_stream_item) + { + return gnuradio::make_block_sptr<nop>(sizeof_stream_item); + } + + nop(size_t sizeof_stream_item) + : block("nop", + gr::io_signature::make(0, -1, sizeof_stream_item), + gr::io_signature::make(0, -1, sizeof_stream_item)) + { + } + + ~nop() override {} + + int general_work(int noutput_items, + gr_vector_int& ninput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) override + { + // eat any input that's available + for (unsigned i = 0; i < ninput_items.size(); i++) + consume(i, ninput_items[i]); + + return noutput_items; + } +}; + + +// ---------------------------------------------------------------------------- +// Basic checks for buffer_single_mapped using the host_buffer implementation +// of the interface for testing. +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t0) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + BOOST_CHECK(buf->space_available() == nitems); + BOOST_CHECK(rdr1->items_available() == 0); + + for (int idx = 1; idx <= 16; ++idx) { + buf->update_write_pointer(1000); + BOOST_CHECK(buf->space_available() == (nitems - (idx * 1000))); + + BOOST_CHECK(rdr1->items_available() == (idx * 1000)); + } + + BOOST_CHECK(buf->space_available() == 384); + + buf->update_write_pointer(buf->space_available()); + BOOST_CHECK(buf->space_available() == 0); + BOOST_CHECK(rdr1->items_available() == nitems); + BOOST_CHECK(buf->space_available() == 0); +} + +// ---------------------------------------------------------------------------- +// Basic checks for buffer_single_mapped using the host_buffer implementation +// of the interface for testing. +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t1) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + int space = buf->space_available(); + BOOST_CHECK(nitems == space); + + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(nitems); + BOOST_CHECK(buf->space_available() == 0); + BOOST_CHECK(rdr1->items_available() == nitems); + + for (int idx = 1; idx <= 16; ++idx) { + rdr1->update_read_pointer(1000); + BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000))); + + space = buf->space_available(); + BOOST_CHECK(space == (idx * 1000)); + } + + BOOST_CHECK(rdr1->items_available() == 384); + rdr1->update_read_pointer(384); + BOOST_CHECK(rdr1->items_available() == 0); +} + +// ---------------------------------------------------------------------------- +// Basic check reader/write wrapping of buffer_single_mapped with 1 reader. +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t2) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + int space = buf->space_available(); + BOOST_CHECK(nitems == space); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(nitems); + BOOST_CHECK(buf->space_available() == 0); + + for (int idx = 1; idx <= 16; ++idx) { + rdr1->update_read_pointer(1000); + BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000))); + + space = buf->space_available(); + + if (idx <= 9) + BOOST_CHECK(space == (idx * 1000)); + else + BOOST_CHECK(space == ((idx * 1000) - (nitems / 2))); + + if (idx == 9) { + buf->update_write_pointer(nitems / 2); + } + } + + // At this point we can only read up until the end of the buffer even though + // additional data is available at the beginning of the buffer + BOOST_CHECK(rdr1->items_available() == 384); + rdr1->update_read_pointer(384); + + // Now the (nitems / 2) at the beginning of the buffer should be available + BOOST_CHECK(rdr1->items_available() == (nitems / 2)); + + for (int idx = 0; idx < 4; ++idx) + rdr1->update_read_pointer(1024); + + BOOST_CHECK(buf->space_available() == (nitems / 2)); + BOOST_CHECK(rdr1->items_available() == (nitems / 4)); + + for (int idx = 0; idx < 4; ++idx) + rdr1->update_read_pointer(1000); + + BOOST_CHECK(buf->space_available() == (nitems / 2)); + BOOST_CHECK(rdr1->items_available() == 96); + + rdr1->update_read_pointer(96); + BOOST_CHECK(rdr1->items_available() == 0); + + BOOST_CHECK(buf->space_available() == (nitems / 2)); +} + +// ---------------------------------------------------------------------------- +// Basic check reader/write wrapping of buffer_single_mapped with 2 readers. +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t3) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + gr::buffer_reader_sptr rdr2(gr::buffer_add_reader(buf, 0, nop)); + + int space = buf->space_available(); + BOOST_CHECK(nitems == space); + BOOST_CHECK(rdr1->items_available() == 0); + BOOST_CHECK(rdr2->items_available() == 0); + + buf->update_write_pointer(nitems); + BOOST_CHECK(buf->space_available() == 0); + BOOST_CHECK(rdr1->items_available() == nitems); + BOOST_CHECK(rdr2->items_available() == nitems); + + for (int idx = 1; idx <= 16; ++idx) { + rdr1->update_read_pointer(1000); + BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000))); + + // Reader 2 hasn't read anything so space available should remain 0 + BOOST_CHECK(buf->space_available() == 0); + } + + int last_rdr1_available = rdr1->items_available(); + int increment = last_rdr1_available / 4; + + for (int idx = 1; idx <= 16; ++idx) { + rdr2->update_read_pointer(1000); + BOOST_CHECK(rdr2->items_available() == (nitems - (idx * 1000))); + + BOOST_CHECK(rdr1->items_available() == last_rdr1_available); + if (idx % 4 == 0) { + rdr1->update_read_pointer(increment); + BOOST_CHECK(rdr1->items_available() == (last_rdr1_available - increment)); + last_rdr1_available = rdr1->items_available(); + } + + BOOST_CHECK(buf->space_available() == (idx * 1000)); + } +} + +// ---------------------------------------------------------------------------- +// Basic check of output blocked callback +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t4) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + BOOST_CHECK(nitems == buf->space_available()); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(nitems / 2); + BOOST_CHECK(buf->space_available() == (nitems / 2)); + BOOST_CHECK(rdr1->items_available() == (nitems / 2)); + + rdr1->update_read_pointer(nitems / 2); + BOOST_CHECK(buf->space_available() == (nitems / 2)); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(8000); + BOOST_CHECK(buf->space_available() == 192); + + bool ready = buf->output_blkd_cb_ready(200); + BOOST_CHECK(ready == true); + + bool success = buf->output_blocked_callback(200); + BOOST_CHECK(success == true); + BOOST_CHECK(buf->space_available() == 8384); + BOOST_CHECK(rdr1->items_available() == 8000); + + rdr1->update_read_pointer(4000); + BOOST_CHECK(buf->space_available() == 8384); + BOOST_CHECK(rdr1->items_available() == 4000); + + buf->update_write_pointer(4000); + BOOST_CHECK(buf->space_available() == 4384); + BOOST_CHECK(rdr1->items_available() == 8000); + + rdr1->update_read_pointer(8000); + BOOST_CHECK(buf->space_available() == 4384); + BOOST_CHECK(rdr1->items_available() == 0); +} + +// ---------------------------------------------------------------------------- +// Basic check of input blocked callback +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t5) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + BOOST_CHECK(nitems == buf->space_available()); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(16000); + BOOST_CHECK(buf->space_available() == 384); + BOOST_CHECK(rdr1->items_available() == 16000); + + rdr1->update_read_pointer(16000); + BOOST_CHECK(buf->space_available() == 384); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(384); + BOOST_CHECK(buf->space_available() == 16000); + BOOST_CHECK(rdr1->items_available() == 384); + + buf->update_write_pointer(116); + BOOST_CHECK(buf->space_available() == 15884); + BOOST_CHECK(rdr1->items_available() == 384); + + bool ready = rdr1->input_blkd_cb_ready(400); + BOOST_CHECK(ready == true); + + bool success = rdr1->input_blocked_callback(400, rdr1->items_available()); + BOOST_CHECK(success == true); + BOOST_CHECK(rdr1->items_available() == 500); + + rdr1->update_read_pointer(500); + BOOST_CHECK(buf->space_available() == 15884); + BOOST_CHECK(rdr1->items_available() == 0); +} |