summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/include/gnuradio/buffer.h
diff options
context:
space:
mode:
authorDavid Sorber <david.sorber@blacklynx.tech>2021-05-12 08:59:21 -0400
committermormj <34754695+mormj@users.noreply.github.com>2021-10-25 11:27:01 -0400
commit788827ae116bef871e144abd39b1e4482208eabe (patch)
treedcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/include/gnuradio/buffer.h
parentb8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff)
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes: * Refactored existing single mapped buffer code and created single mapped buffer abstraction; wrapping within single mapped buffers is handled explicitly by input blocked and output blocked callbacks that are called from block_executor * Added simple custom buffer allocation interface (NOTE: this interface will change for milestone 2) * Accelerated blocks are still responsible for data transfer but the custom buffer interface eliminates the double copy problem Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/include/gnuradio/buffer.h')
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer.h275
1 files changed, 120 insertions, 155 deletions
diff --git a/gnuradio-runtime/include/gnuradio/buffer.h b/gnuradio-runtime/include/gnuradio/buffer.h
index 55313df6c0..037fade172 100644
--- a/gnuradio-runtime/include/gnuradio/buffer.h
+++ b/gnuradio-runtime/include/gnuradio/buffer.h
@@ -12,16 +12,24 @@
#define INCLUDED_GR_RUNTIME_BUFFER_H
#include <gnuradio/api.h>
+#include <gnuradio/custom_lock.h>
#include <gnuradio/logger.h>
#include <gnuradio/runtime_types.h>
#include <gnuradio/tags.h>
#include <gnuradio/thread/thread.h>
+#include <boost/weak_ptr.hpp>
+#include <iostream>
#include <map>
#include <memory>
+
namespace gr {
class vmcircbuf;
+class buffer_reader;
+class buffer_reader_sm;
+
+enum class BufferMappingType { DoubleMapped, SingleMapped };
/*!
* \brief Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
@@ -36,13 +44,15 @@ class vmcircbuf;
*/
GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
size_t sizeof_item,
- block_sptr link = block_sptr());
+ uint64_t downstream_lcm_nitems,
+ block_sptr link = block_sptr(),
+ block_sptr buf_owner = block_sptr());
/*!
* \brief Single writer, multiple reader fifo.
* \ingroup internal
*/
-class GR_RUNTIME_API buffer
+class GR_RUNTIME_API buffer : public custom_lock_if
{
public:
gr::logger_ptr d_logger;
@@ -51,14 +61,19 @@ public:
virtual ~buffer();
/*!
+ * \brief return the buffer's mapping type
+ */
+ BufferMappingType get_mapping_type() { return d_buf_map_type; }
+
+ /*!
* \brief return number of items worth of space available for writing
*/
- int space_available();
+ virtual int space_available() = 0;
/*!
* \brief return size of this buffer in items
*/
- int bufsize() const { return d_bufsize; }
+ unsigned int bufsize() const { return d_bufsize; }
/*!
* \brief return the base address of the buffer
@@ -71,7 +86,7 @@ public:
* The return value points at space that can hold at least
* space_available() items.
*/
- void* write_pointer();
+ virtual void* write_pointer();
/*!
* \brief tell buffer that we wrote \p nitems into it
@@ -93,10 +108,22 @@ public:
uint64_t nitems_written() { return d_abs_write_offset; }
- void reset_nitem_counter() { d_abs_write_offset = 0; }
+ void reset_nitem_counter()
+ {
+ d_write_index = 0;
+ d_abs_write_offset = 0;
+ }
size_t get_sizeof_item() { return d_sizeof_item; }
+ uint64_t get_downstream_lcm_nitems() { return d_downstream_lcm_nitems; }
+
+ virtual void update_reader_block_history(unsigned history, int delay)
+ {
+ d_max_reader_history = std::max(d_max_reader_history, history);
+ d_has_history = (d_max_reader_history > 1);
+ }
+
/*!
* \brief Adds a new tag to the buffer.
*
@@ -139,12 +166,67 @@ public:
return d_item_tags.upper_bound(x);
}
+ /*!
+ * \brief Returns true if the current thread is ready to execute
+ * output_blocked_callback(), false otherwise. Note if the default
+ * output_blocked_callback is overridden this function should also be
+ * overridden.
+ */
+ virtual bool output_blkd_cb_ready(int output_multiple) { return false; }
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the output is blocked. Override this function if needed.
+ */
+ virtual bool output_blocked_callback(int output_multiple, bool force = false)
+ {
+ return false;
+ }
+
+ /*!
+ * \brief Increment the number of active pointers for this buffer.
+ */
+ inline void increment_active()
+ {
+ gr::thread::scoped_lock lock(d_mutex);
+
+ d_cv.wait(lock, [this]() { return d_callback_flag == false; });
+ ++d_active_pointer_counter;
+ }
+
+ /*!
+ * \brief Decrement the number of active pointers for this buffer and signal
+ * anyone waiting when the count reaches zero.
+ */
+ inline void decrement_active()
+ {
+ gr::thread::scoped_lock lock(d_mutex);
+
+ if (--d_active_pointer_counter == 0)
+ d_cv.notify_all();
+ }
+
+ /*!
+ * \brief "on_lock" function from the custom_lock_if.
+ */
+ void on_lock(gr::thread::scoped_lock& lock);
+
+ /*!
+ * \brief "on_unlock" function from the custom_lock_if.
+ */
+ void on_unlock();
+
+ friend std::ostream& operator<<(std::ostream& os, const buffer& buf);
+
// -------------------------------------------------------------------------
private:
friend class buffer_reader;
+ friend class buffer_reader_sm;
+
friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
block_sptr link);
friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
int nzero_preload,
@@ -154,12 +236,18 @@ private:
protected:
char* d_base; // base address of buffer inside d_vmcircbuf.
unsigned int d_bufsize; // in items
+ BufferMappingType d_buf_map_type;
// Keep track of maximum sample delay of any reader; Only prune tags past this.
unsigned d_max_reader_delay;
-private:
- std::unique_ptr<gr::vmcircbuf> d_vmcircbuf;
+ // Keep track of the maximum sample history requirements of all blocks that
+ // consume from this buffer
+ unsigned d_max_reader_history;
+
+ // Indicates if d_max_reader_history > 1
+ bool d_has_history;
+
size_t d_sizeof_item; // in bytes
std::vector<buffer_reader*> d_readers;
std::weak_ptr<block> d_link; // block that writes to this buffer
@@ -167,6 +255,7 @@ private:
//
// The mutex protects d_write_index, d_abs_write_offset, d_done, d_item_tags
// and the d_read_index's and d_abs_read_offset's in the buffer readers.
+ // Also d_callback_flag and d_active_pointer_counter.
//
gr::thread::mutex d_mutex;
unsigned int d_write_index; // in items [0,d_bufsize)
@@ -174,45 +263,47 @@ private:
bool d_done;
std::multimap<uint64_t, tag_t> d_item_tags;
uint64_t d_last_min_items_read;
+ //
+ gr::thread::condition_variable d_cv;
+ bool d_callback_flag;
+ uint32_t d_active_pointer_counter;
- unsigned index_add(unsigned a, unsigned b)
- {
- unsigned s = a + b;
-
- if (s >= d_bufsize)
- s -= d_bufsize;
-
- assert(s < d_bufsize);
- return s;
- }
-
- unsigned index_sub(unsigned a, unsigned b)
- {
- int s = a - b;
+ uint64_t d_downstream_lcm_nitems;
+ uint64_t d_write_multiple;
- if (s < 0)
- s += d_bufsize;
+ /*!
+ * \brief Increment read or write index for this buffer
+ */
+ virtual unsigned index_add(unsigned a, unsigned b) = 0;
- assert((unsigned)s < d_bufsize);
- return s;
- }
+ /*!
+ * \brief Decrement read or write index for this buffer
+ */
+ virtual unsigned index_sub(unsigned a, unsigned b) = 0;
- virtual bool allocate_buffer(int nitems, size_t sizeof_item);
+ virtual bool allocate_buffer(int nitems, size_t sizeof_item) { return false; };
/*!
* \brief constructor is private. Use gr_make_buffer to create instances.
*
* Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
*
+ * \param buftype is an enum type that describes the buffer TODO: fix me
* \param nitems is the minimum number of items the buffer will hold.
* \param sizeof_item is the size of an item in bytes.
+ * \param downstream_lcm_nitems is the least common multiple of the items to
+ * read by downstream block(s)
* \param link is the block that writes to this buffer.
*
* The total size of the buffer will be rounded up to a system
* dependent boundary. This is typically the system page size, but
* under MS windows is 64KB.
*/
- buffer(int nitems, size_t sizeof_item, block_sptr link);
+ buffer(BufferMappingType buftype,
+ int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link);
/*!
* \brief disassociate \p reader from this buffer
@@ -220,135 +311,9 @@ private:
void drop_reader(buffer_reader* reader);
};
-/*!
- * \brief Create a new gr::buffer_reader and attach it to buffer \p buf
- * \param buf is the buffer the \p gr::buffer_reader reads from.
- * \param nzero_preload -- number of zero items to "preload" into buffer.
- * \param link is the block that reads from the buffer using this gr::buffer_reader.
- * \param delay Optional setting to declare the buffer's sample delay.
- */
-GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
- int nzero_preload,
- block_sptr link = block_sptr(),
- int delay = 0);
-
//! returns # of buffers currently allocated
GR_RUNTIME_API long buffer_ncurrently_allocated();
-
-// ---------------------------------------------------------------------------
-
-/*!
- * \brief How we keep track of the readers of a gr::buffer.
- * \ingroup internal
- */
-class GR_RUNTIME_API buffer_reader
-{
-public:
- ~buffer_reader();
-
- /*!
- * Declares the sample delay for this reader.
- *
- * See gr::block::declare_sample_delay for details.
- *
- * \param delay The new sample delay
- */
- void declare_sample_delay(unsigned delay);
-
- /*!
- * Gets the sample delay for this reader.
- *
- * See gr::block::sample_delay for details.
- */
- unsigned sample_delay() const;
-
- /*!
- * \brief Return number of items available for reading.
- */
- int items_available() const;
-
- /*!
- * \brief Return buffer this reader reads from.
- */
- buffer_sptr buffer() const { return d_buffer; }
-
- /*!
- * \brief Return maximum number of items that could ever be available for reading.
- * This is used as a sanity check in the scheduler to avoid looping forever.
- */
- int max_possible_items_available() const { return d_buffer->d_bufsize - 1; }
-
- /*!
- * \brief return pointer to read buffer.
- *
- * The return value points to items_available() number of items
- */
- const void* read_pointer();
-
- /*
- * \brief tell buffer we read \p items from it
- */
- void update_read_pointer(int nitems);
-
- void set_done(bool done) { d_buffer->set_done(done); }
- bool done() const { return d_buffer->done(); }
-
- gr::thread::mutex* mutex() { return d_buffer->mutex(); }
-
- uint64_t nitems_read() { return d_abs_read_offset; }
-
- void reset_nitem_counter() { d_abs_read_offset = 0; }
-
- size_t get_sizeof_item() { return d_buffer->get_sizeof_item(); }
-
- /*!
- * \brief Return the block that reads via this reader.
- *
- */
- block_sptr link() { return block_sptr(d_link); }
-
- /*!
- * \brief Given a [start,end), returns a vector all tags in the range.
- *
- * Get a vector of tags in given range. Range of counts is from start to end-1.
- *
- * Tags are tuples of:
- * (item count, source id, key, value)
- *
- * \param v a vector reference to return tags into
- * \param abs_start a uint64 count of the start of the range of interest
- * \param abs_end a uint64 count of the end of the range of interest
- * \param id the unique ID of the block to make sure already deleted tags
- * are not returned
- */
- void get_tags_in_range(std::vector<tag_t>& v,
- uint64_t abs_start,
- uint64_t abs_end,
- long id);
-
- // -------------------------------------------------------------------------
-
-private:
- friend class buffer;
- friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
- int nzero_preload,
- block_sptr link,
- int delay);
-
- buffer_sptr d_buffer;
- unsigned int d_read_index; // in items [0,d->buffer.d_bufsize)
- uint64_t d_abs_read_offset; // num items seen since the start
- std::weak_ptr<block> d_link; // block that reads via this buffer reader
- unsigned d_attr_delay; // sample delay attribute for tag propagation
-
- //! constructor is private. Use gr::buffer::add_reader to create instances
- buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link);
-};
-
-//! returns # of buffer_readers currently allocated
-GR_RUNTIME_API long buffer_reader_ncurrently_allocated();
-
} /* namespace gr */
#endif /* INCLUDED_GR_RUNTIME_BUFFER_H */