diff options
author | David Sorber <david.sorber@blacklynx.tech> | 2021-05-12 08:59:21 -0400 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-10-25 11:27:01 -0400 |
commit | 788827ae116bef871e144abd39b1e4482208eabe (patch) | |
tree | dcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/include/gnuradio/buffer.h | |
parent | b8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff) |
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes:
* Refactored existing single mapped buffer code and created single
mapped buffer abstraction; wrapping within single mapped buffers
is handled explicitly by input blocked and output blocked
callbacks that are called from block_executor
* Added simple custom buffer allocation interface (NOTE: this
interface will change for milestone 2)
* Accelerated blocks are still responsible for data transfer but the
custom buffer interface eliminates the double copy problem
Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/include/gnuradio/buffer.h')
-rw-r--r-- | gnuradio-runtime/include/gnuradio/buffer.h | 275 |
1 files changed, 120 insertions, 155 deletions
diff --git a/gnuradio-runtime/include/gnuradio/buffer.h b/gnuradio-runtime/include/gnuradio/buffer.h index 55313df6c0..037fade172 100644 --- a/gnuradio-runtime/include/gnuradio/buffer.h +++ b/gnuradio-runtime/include/gnuradio/buffer.h @@ -12,16 +12,24 @@ #define INCLUDED_GR_RUNTIME_BUFFER_H #include <gnuradio/api.h> +#include <gnuradio/custom_lock.h> #include <gnuradio/logger.h> #include <gnuradio/runtime_types.h> #include <gnuradio/tags.h> #include <gnuradio/thread/thread.h> +#include <boost/weak_ptr.hpp> +#include <iostream> #include <map> #include <memory> + namespace gr { class vmcircbuf; +class buffer_reader; +class buffer_reader_sm; + +enum class BufferMappingType { DoubleMapped, SingleMapped }; /*! * \brief Allocate a buffer that holds at least \p nitems of size \p sizeof_item. @@ -36,13 +44,15 @@ class vmcircbuf; */ GR_RUNTIME_API buffer_sptr make_buffer(int nitems, size_t sizeof_item, - block_sptr link = block_sptr()); + uint64_t downstream_lcm_nitems, + block_sptr link = block_sptr(), + block_sptr buf_owner = block_sptr()); /*! * \brief Single writer, multiple reader fifo. * \ingroup internal */ -class GR_RUNTIME_API buffer +class GR_RUNTIME_API buffer : public custom_lock_if { public: gr::logger_ptr d_logger; @@ -51,14 +61,19 @@ public: virtual ~buffer(); /*! + * \brief return the buffer's mapping type + */ + BufferMappingType get_mapping_type() { return d_buf_map_type; } + + /*! * \brief return number of items worth of space available for writing */ - int space_available(); + virtual int space_available() = 0; /*! * \brief return size of this buffer in items */ - int bufsize() const { return d_bufsize; } + unsigned int bufsize() const { return d_bufsize; } /*! * \brief return the base address of the buffer @@ -71,7 +86,7 @@ public: * The return value points at space that can hold at least * space_available() items. */ - void* write_pointer(); + virtual void* write_pointer(); /*! * \brief tell buffer that we wrote \p nitems into it @@ -93,10 +108,22 @@ public: uint64_t nitems_written() { return d_abs_write_offset; } - void reset_nitem_counter() { d_abs_write_offset = 0; } + void reset_nitem_counter() + { + d_write_index = 0; + d_abs_write_offset = 0; + } size_t get_sizeof_item() { return d_sizeof_item; } + uint64_t get_downstream_lcm_nitems() { return d_downstream_lcm_nitems; } + + virtual void update_reader_block_history(unsigned history, int delay) + { + d_max_reader_history = std::max(d_max_reader_history, history); + d_has_history = (d_max_reader_history > 1); + } + /*! * \brief Adds a new tag to the buffer. * @@ -139,12 +166,67 @@ public: return d_item_tags.upper_bound(x); } + /*! + * \brief Returns true if the current thread is ready to execute + * output_blocked_callback(), false otherwise. Note if the default + * output_blocked_callback is overridden this function should also be + * overridden. + */ + virtual bool output_blkd_cb_ready(int output_multiple) { return false; } + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the output is blocked. Override this function if needed. + */ + virtual bool output_blocked_callback(int output_multiple, bool force = false) + { + return false; + } + + /*! + * \brief Increment the number of active pointers for this buffer. + */ + inline void increment_active() + { + gr::thread::scoped_lock lock(d_mutex); + + d_cv.wait(lock, [this]() { return d_callback_flag == false; }); + ++d_active_pointer_counter; + } + + /*! + * \brief Decrement the number of active pointers for this buffer and signal + * anyone waiting when the count reaches zero. + */ + inline void decrement_active() + { + gr::thread::scoped_lock lock(d_mutex); + + if (--d_active_pointer_counter == 0) + d_cv.notify_all(); + } + + /*! + * \brief "on_lock" function from the custom_lock_if. + */ + void on_lock(gr::thread::scoped_lock& lock); + + /*! + * \brief "on_unlock" function from the custom_lock_if. + */ + void on_unlock(); + + friend std::ostream& operator<<(std::ostream& os, const buffer& buf); + // ------------------------------------------------------------------------- private: friend class buffer_reader; + friend class buffer_reader_sm; + friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems, size_t sizeof_item, + uint64_t downstream_lcm_nitems, block_sptr link); friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, int nzero_preload, @@ -154,12 +236,18 @@ private: protected: char* d_base; // base address of buffer inside d_vmcircbuf. unsigned int d_bufsize; // in items + BufferMappingType d_buf_map_type; // Keep track of maximum sample delay of any reader; Only prune tags past this. unsigned d_max_reader_delay; -private: - std::unique_ptr<gr::vmcircbuf> d_vmcircbuf; + // Keep track of the maximum sample history requirements of all blocks that + // consume from this buffer + unsigned d_max_reader_history; + + // Indicates if d_max_reader_history > 1 + bool d_has_history; + size_t d_sizeof_item; // in bytes std::vector<buffer_reader*> d_readers; std::weak_ptr<block> d_link; // block that writes to this buffer @@ -167,6 +255,7 @@ private: // // The mutex protects d_write_index, d_abs_write_offset, d_done, d_item_tags // and the d_read_index's and d_abs_read_offset's in the buffer readers. + // Also d_callback_flag and d_active_pointer_counter. // gr::thread::mutex d_mutex; unsigned int d_write_index; // in items [0,d_bufsize) @@ -174,45 +263,47 @@ private: bool d_done; std::multimap<uint64_t, tag_t> d_item_tags; uint64_t d_last_min_items_read; + // + gr::thread::condition_variable d_cv; + bool d_callback_flag; + uint32_t d_active_pointer_counter; - unsigned index_add(unsigned a, unsigned b) - { - unsigned s = a + b; - - if (s >= d_bufsize) - s -= d_bufsize; - - assert(s < d_bufsize); - return s; - } - - unsigned index_sub(unsigned a, unsigned b) - { - int s = a - b; + uint64_t d_downstream_lcm_nitems; + uint64_t d_write_multiple; - if (s < 0) - s += d_bufsize; + /*! + * \brief Increment read or write index for this buffer + */ + virtual unsigned index_add(unsigned a, unsigned b) = 0; - assert((unsigned)s < d_bufsize); - return s; - } + /*! + * \brief Decrement read or write index for this buffer + */ + virtual unsigned index_sub(unsigned a, unsigned b) = 0; - virtual bool allocate_buffer(int nitems, size_t sizeof_item); + virtual bool allocate_buffer(int nitems, size_t sizeof_item) { return false; }; /*! * \brief constructor is private. Use gr_make_buffer to create instances. * * Allocate a buffer that holds at least \p nitems of size \p sizeof_item. * + * \param buftype is an enum type that describes the buffer TODO: fix me * \param nitems is the minimum number of items the buffer will hold. * \param sizeof_item is the size of an item in bytes. + * \param downstream_lcm_nitems is the least common multiple of the items to + * read by downstream block(s) * \param link is the block that writes to this buffer. * * The total size of the buffer will be rounded up to a system * dependent boundary. This is typically the system page size, but * under MS windows is 64KB. */ - buffer(int nitems, size_t sizeof_item, block_sptr link); + buffer(BufferMappingType buftype, + int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link); /*! * \brief disassociate \p reader from this buffer @@ -220,135 +311,9 @@ private: void drop_reader(buffer_reader* reader); }; -/*! - * \brief Create a new gr::buffer_reader and attach it to buffer \p buf - * \param buf is the buffer the \p gr::buffer_reader reads from. - * \param nzero_preload -- number of zero items to "preload" into buffer. - * \param link is the block that reads from the buffer using this gr::buffer_reader. - * \param delay Optional setting to declare the buffer's sample delay. - */ -GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, - int nzero_preload, - block_sptr link = block_sptr(), - int delay = 0); - //! returns # of buffers currently allocated GR_RUNTIME_API long buffer_ncurrently_allocated(); - -// --------------------------------------------------------------------------- - -/*! - * \brief How we keep track of the readers of a gr::buffer. - * \ingroup internal - */ -class GR_RUNTIME_API buffer_reader -{ -public: - ~buffer_reader(); - - /*! - * Declares the sample delay for this reader. - * - * See gr::block::declare_sample_delay for details. - * - * \param delay The new sample delay - */ - void declare_sample_delay(unsigned delay); - - /*! - * Gets the sample delay for this reader. - * - * See gr::block::sample_delay for details. - */ - unsigned sample_delay() const; - - /*! - * \brief Return number of items available for reading. - */ - int items_available() const; - - /*! - * \brief Return buffer this reader reads from. - */ - buffer_sptr buffer() const { return d_buffer; } - - /*! - * \brief Return maximum number of items that could ever be available for reading. - * This is used as a sanity check in the scheduler to avoid looping forever. - */ - int max_possible_items_available() const { return d_buffer->d_bufsize - 1; } - - /*! - * \brief return pointer to read buffer. - * - * The return value points to items_available() number of items - */ - const void* read_pointer(); - - /* - * \brief tell buffer we read \p items from it - */ - void update_read_pointer(int nitems); - - void set_done(bool done) { d_buffer->set_done(done); } - bool done() const { return d_buffer->done(); } - - gr::thread::mutex* mutex() { return d_buffer->mutex(); } - - uint64_t nitems_read() { return d_abs_read_offset; } - - void reset_nitem_counter() { d_abs_read_offset = 0; } - - size_t get_sizeof_item() { return d_buffer->get_sizeof_item(); } - - /*! - * \brief Return the block that reads via this reader. - * - */ - block_sptr link() { return block_sptr(d_link); } - - /*! - * \brief Given a [start,end), returns a vector all tags in the range. - * - * Get a vector of tags in given range. Range of counts is from start to end-1. - * - * Tags are tuples of: - * (item count, source id, key, value) - * - * \param v a vector reference to return tags into - * \param abs_start a uint64 count of the start of the range of interest - * \param abs_end a uint64 count of the end of the range of interest - * \param id the unique ID of the block to make sure already deleted tags - * are not returned - */ - void get_tags_in_range(std::vector<tag_t>& v, - uint64_t abs_start, - uint64_t abs_end, - long id); - - // ------------------------------------------------------------------------- - -private: - friend class buffer; - friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, - int nzero_preload, - block_sptr link, - int delay); - - buffer_sptr d_buffer; - unsigned int d_read_index; // in items [0,d->buffer.d_bufsize) - uint64_t d_abs_read_offset; // num items seen since the start - std::weak_ptr<block> d_link; // block that reads via this buffer reader - unsigned d_attr_delay; // sample delay attribute for tag propagation - - //! constructor is private. Use gr::buffer::add_reader to create instances - buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link); -}; - -//! returns # of buffer_readers currently allocated -GR_RUNTIME_API long buffer_reader_ncurrently_allocated(); - } /* namespace gr */ #endif /* INCLUDED_GR_RUNTIME_BUFFER_H */ |