diff options
45 files changed, 2463 insertions, 515 deletions
diff --git a/gnuradio-runtime/include/gnuradio/CMakeLists.txt b/gnuradio-runtime/include/gnuradio/CMakeLists.txt index 4de1ceb353..9e22212b7b 100644 --- a/gnuradio-runtime/include/gnuradio/CMakeLists.txt +++ b/gnuradio-runtime/include/gnuradio/CMakeLists.txt @@ -20,7 +20,12 @@ install(FILES block_gateway.h block_registry.h buffer.h + buffer_double_mapped.h + buffer_reader.h + buffer_reader_sm.h + buffer_single_mapped.h constants.h + buffer_type.h endianness.h expj.h flowgraph.h diff --git a/gnuradio-runtime/include/gnuradio/basic_block.h b/gnuradio-runtime/include/gnuradio/basic_block.h index 7a265cd05e..3816e8ab98 100644 --- a/gnuradio-runtime/include/gnuradio/basic_block.h +++ b/gnuradio-runtime/include/gnuradio/basic_block.h @@ -127,6 +127,18 @@ protected: // Message passing interface pmt::pmt_t d_message_subscribers; + /*! + * \brief This is meant to be called by derived classes (e.g. block) to get + * a shared pointer internally. This is needed because + * std::enable_shared_from_this doesn't seem to work with derived classes + * in an inheritance hierarchy. + */ + template <typename Derived> + std::shared_ptr<Derived> shared_from_base() + { + return std::static_pointer_cast<Derived>(shared_from_this()); + } + public: pmt::pmt_t message_subscribers(pmt::pmt_t port); ~basic_block() override; diff --git a/gnuradio-runtime/include/gnuradio/block.h b/gnuradio-runtime/include/gnuradio/block.h index 0e25509d00..ee90a35d2c 100644 --- a/gnuradio-runtime/include/gnuradio/block.h +++ b/gnuradio-runtime/include/gnuradio/block.h @@ -11,8 +11,11 @@ #ifndef INCLUDED_GR_RUNTIME_BLOCK_H #define INCLUDED_GR_RUNTIME_BLOCK_H +#include <memory> + #include <gnuradio/api.h> #include <gnuradio/basic_block.h> +#include <gnuradio/buffer_type.h> #include <gnuradio/config.h> #include <gnuradio/logger.h> #include <gnuradio/tags.h> @@ -514,6 +517,78 @@ public: */ void set_min_output_buffer(int port, long min_output_buffer); + /*! + * \brief Allocate the block_detail and necessary output buffers for this + * block. + */ + void allocate_detail(int ninputs, + int noutputs, + const std::vector<int>& downstream_max_nitems_vec, + const std::vector<uint64_t>& downstream_lcm_nitems_vec); + + // --------------- Custom buffer-related functions ------------- + + /*! + * \brief Replace the block's buffer with a new one owned by the block_owner + * parameter + * + * \details + * This function is used to replace the buffer on the specified output port + * of the block with a new buffer that is "owned" by the specified block. This + * function will only be called if a downstream block is using a custom buffer + * that is incompatible with the default buffer type created by this block. + * + */ + buffer_sptr replace_buffer(uint32_t out_port, block_sptr block_owner); + + /*! + * \brief Return the type of custom buffer used by the block + * + * \details + * Blocks that wish to allocate custom buffers should override this function. + */ + virtual buffer_type_t get_buffer_type() + { +#if DEBUG_SINGLE_MAPPED + return buftype_CUSTOM_HOST::get(); +#else + return buftype_DEFAULT_NON_CUSTOM::get(); +#endif + } + + /*! + * \brief Allocate a custom buffer for the block + * + * \details + * Blocks that wish to allocate custom buffers should override this function. + * + * \param size the size of the buffer to allocate in bytes + */ + virtual char* allocate_custom_buffer(size_t size) + { +#if DEBUG_SINGLE_MAPPED + return new char[size](); +#else + return nullptr; +#endif + } + + /*! + * \brief Free a custom buffer previously allocated by allocate_custom_buffer() + * + * \details + * Blocks that wish to allocate custom buffers should override this function. + * + * \param buffer a pointer to the buffer + */ + virtual void free_custom_buffer(char* buffer) + { +#if DEBUG_SINGLE_MAPPED + delete[] buffer; +#endif + } + + // --------------- Performance counter functions ------------- /*! @@ -909,6 +984,14 @@ protected: void enable_update_rate(bool en); + /*! + * \brief Allocate a buffer for the given output port of this block. Note + * that the downstream max number of items must be passed in to this + * function for consideration. + */ + buffer_sptr + allocate_buffer(int port, int downstream_max_nitems, uint64_t downstream_lcm_nitems); + std::vector<long> d_max_output_buffer; std::vector<long> d_min_output_buffer; diff --git a/gnuradio-runtime/include/gnuradio/block_detail.h b/gnuradio-runtime/include/gnuradio/block_detail.h index 454945f0f5..4ab2ae6ed1 100644 --- a/gnuradio-runtime/include/gnuradio/block_detail.h +++ b/gnuradio-runtime/include/gnuradio/block_detail.h @@ -12,6 +12,8 @@ #define INCLUDED_GR_RUNTIME_BLOCK_DETAIL_H #include <gnuradio/api.h> +#include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/high_res_timer.h> #include <gnuradio/logger.h> #include <gnuradio/runtime_types.h> @@ -193,6 +195,19 @@ public: */ int set_thread_priority(int priority); + /*! + * Post general_work() cleanup to decrement the active counts for all inputs + * and outputs. + */ + void post_work_cleanup() + { + // Decrement active counts for all inputs and outputs + for (int i = 0; i < noutputs(); i++) + output(i)->decrement_active(); + for (int i = 0; i < ninputs(); i++) + input(i)->buffer()->decrement_active(); + } + bool threaded; // set if thread is currently running. gr::thread::gr_thread_t thread; // portable thread handle diff --git a/gnuradio-runtime/include/gnuradio/buffer.h b/gnuradio-runtime/include/gnuradio/buffer.h index 55313df6c0..037fade172 100644 --- a/gnuradio-runtime/include/gnuradio/buffer.h +++ b/gnuradio-runtime/include/gnuradio/buffer.h @@ -12,16 +12,24 @@ #define INCLUDED_GR_RUNTIME_BUFFER_H #include <gnuradio/api.h> +#include <gnuradio/custom_lock.h> #include <gnuradio/logger.h> #include <gnuradio/runtime_types.h> #include <gnuradio/tags.h> #include <gnuradio/thread/thread.h> +#include <boost/weak_ptr.hpp> +#include <iostream> #include <map> #include <memory> + namespace gr { class vmcircbuf; +class buffer_reader; +class buffer_reader_sm; + +enum class BufferMappingType { DoubleMapped, SingleMapped }; /*! * \brief Allocate a buffer that holds at least \p nitems of size \p sizeof_item. @@ -36,13 +44,15 @@ class vmcircbuf; */ GR_RUNTIME_API buffer_sptr make_buffer(int nitems, size_t sizeof_item, - block_sptr link = block_sptr()); + uint64_t downstream_lcm_nitems, + block_sptr link = block_sptr(), + block_sptr buf_owner = block_sptr()); /*! * \brief Single writer, multiple reader fifo. * \ingroup internal */ -class GR_RUNTIME_API buffer +class GR_RUNTIME_API buffer : public custom_lock_if { public: gr::logger_ptr d_logger; @@ -51,14 +61,19 @@ public: virtual ~buffer(); /*! + * \brief return the buffer's mapping type + */ + BufferMappingType get_mapping_type() { return d_buf_map_type; } + + /*! * \brief return number of items worth of space available for writing */ - int space_available(); + virtual int space_available() = 0; /*! * \brief return size of this buffer in items */ - int bufsize() const { return d_bufsize; } + unsigned int bufsize() const { return d_bufsize; } /*! * \brief return the base address of the buffer @@ -71,7 +86,7 @@ public: * The return value points at space that can hold at least * space_available() items. */ - void* write_pointer(); + virtual void* write_pointer(); /*! * \brief tell buffer that we wrote \p nitems into it @@ -93,10 +108,22 @@ public: uint64_t nitems_written() { return d_abs_write_offset; } - void reset_nitem_counter() { d_abs_write_offset = 0; } + void reset_nitem_counter() + { + d_write_index = 0; + d_abs_write_offset = 0; + } size_t get_sizeof_item() { return d_sizeof_item; } + uint64_t get_downstream_lcm_nitems() { return d_downstream_lcm_nitems; } + + virtual void update_reader_block_history(unsigned history, int delay) + { + d_max_reader_history = std::max(d_max_reader_history, history); + d_has_history = (d_max_reader_history > 1); + } + /*! * \brief Adds a new tag to the buffer. * @@ -139,12 +166,67 @@ public: return d_item_tags.upper_bound(x); } + /*! + * \brief Returns true if the current thread is ready to execute + * output_blocked_callback(), false otherwise. Note if the default + * output_blocked_callback is overridden this function should also be + * overridden. + */ + virtual bool output_blkd_cb_ready(int output_multiple) { return false; } + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the output is blocked. Override this function if needed. + */ + virtual bool output_blocked_callback(int output_multiple, bool force = false) + { + return false; + } + + /*! + * \brief Increment the number of active pointers for this buffer. + */ + inline void increment_active() + { + gr::thread::scoped_lock lock(d_mutex); + + d_cv.wait(lock, [this]() { return d_callback_flag == false; }); + ++d_active_pointer_counter; + } + + /*! + * \brief Decrement the number of active pointers for this buffer and signal + * anyone waiting when the count reaches zero. + */ + inline void decrement_active() + { + gr::thread::scoped_lock lock(d_mutex); + + if (--d_active_pointer_counter == 0) + d_cv.notify_all(); + } + + /*! + * \brief "on_lock" function from the custom_lock_if. + */ + void on_lock(gr::thread::scoped_lock& lock); + + /*! + * \brief "on_unlock" function from the custom_lock_if. + */ + void on_unlock(); + + friend std::ostream& operator<<(std::ostream& os, const buffer& buf); + // ------------------------------------------------------------------------- private: friend class buffer_reader; + friend class buffer_reader_sm; + friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems, size_t sizeof_item, + uint64_t downstream_lcm_nitems, block_sptr link); friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, int nzero_preload, @@ -154,12 +236,18 @@ private: protected: char* d_base; // base address of buffer inside d_vmcircbuf. unsigned int d_bufsize; // in items + BufferMappingType d_buf_map_type; // Keep track of maximum sample delay of any reader; Only prune tags past this. unsigned d_max_reader_delay; -private: - std::unique_ptr<gr::vmcircbuf> d_vmcircbuf; + // Keep track of the maximum sample history requirements of all blocks that + // consume from this buffer + unsigned d_max_reader_history; + + // Indicates if d_max_reader_history > 1 + bool d_has_history; + size_t d_sizeof_item; // in bytes std::vector<buffer_reader*> d_readers; std::weak_ptr<block> d_link; // block that writes to this buffer @@ -167,6 +255,7 @@ private: // // The mutex protects d_write_index, d_abs_write_offset, d_done, d_item_tags // and the d_read_index's and d_abs_read_offset's in the buffer readers. + // Also d_callback_flag and d_active_pointer_counter. // gr::thread::mutex d_mutex; unsigned int d_write_index; // in items [0,d_bufsize) @@ -174,45 +263,47 @@ private: bool d_done; std::multimap<uint64_t, tag_t> d_item_tags; uint64_t d_last_min_items_read; + // + gr::thread::condition_variable d_cv; + bool d_callback_flag; + uint32_t d_active_pointer_counter; - unsigned index_add(unsigned a, unsigned b) - { - unsigned s = a + b; - - if (s >= d_bufsize) - s -= d_bufsize; - - assert(s < d_bufsize); - return s; - } - - unsigned index_sub(unsigned a, unsigned b) - { - int s = a - b; + uint64_t d_downstream_lcm_nitems; + uint64_t d_write_multiple; - if (s < 0) - s += d_bufsize; + /*! + * \brief Increment read or write index for this buffer + */ + virtual unsigned index_add(unsigned a, unsigned b) = 0; - assert((unsigned)s < d_bufsize); - return s; - } + /*! + * \brief Decrement read or write index for this buffer + */ + virtual unsigned index_sub(unsigned a, unsigned b) = 0; - virtual bool allocate_buffer(int nitems, size_t sizeof_item); + virtual bool allocate_buffer(int nitems, size_t sizeof_item) { return false; }; /*! * \brief constructor is private. Use gr_make_buffer to create instances. * * Allocate a buffer that holds at least \p nitems of size \p sizeof_item. * + * \param buftype is an enum type that describes the buffer TODO: fix me * \param nitems is the minimum number of items the buffer will hold. * \param sizeof_item is the size of an item in bytes. + * \param downstream_lcm_nitems is the least common multiple of the items to + * read by downstream block(s) * \param link is the block that writes to this buffer. * * The total size of the buffer will be rounded up to a system * dependent boundary. This is typically the system page size, but * under MS windows is 64KB. */ - buffer(int nitems, size_t sizeof_item, block_sptr link); + buffer(BufferMappingType buftype, + int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link); /*! * \brief disassociate \p reader from this buffer @@ -220,135 +311,9 @@ private: void drop_reader(buffer_reader* reader); }; -/*! - * \brief Create a new gr::buffer_reader and attach it to buffer \p buf - * \param buf is the buffer the \p gr::buffer_reader reads from. - * \param nzero_preload -- number of zero items to "preload" into buffer. - * \param link is the block that reads from the buffer using this gr::buffer_reader. - * \param delay Optional setting to declare the buffer's sample delay. - */ -GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, - int nzero_preload, - block_sptr link = block_sptr(), - int delay = 0); - //! returns # of buffers currently allocated GR_RUNTIME_API long buffer_ncurrently_allocated(); - -// --------------------------------------------------------------------------- - -/*! - * \brief How we keep track of the readers of a gr::buffer. - * \ingroup internal - */ -class GR_RUNTIME_API buffer_reader -{ -public: - ~buffer_reader(); - - /*! - * Declares the sample delay for this reader. - * - * See gr::block::declare_sample_delay for details. - * - * \param delay The new sample delay - */ - void declare_sample_delay(unsigned delay); - - /*! - * Gets the sample delay for this reader. - * - * See gr::block::sample_delay for details. - */ - unsigned sample_delay() const; - - /*! - * \brief Return number of items available for reading. - */ - int items_available() const; - - /*! - * \brief Return buffer this reader reads from. - */ - buffer_sptr buffer() const { return d_buffer; } - - /*! - * \brief Return maximum number of items that could ever be available for reading. - * This is used as a sanity check in the scheduler to avoid looping forever. - */ - int max_possible_items_available() const { return d_buffer->d_bufsize - 1; } - - /*! - * \brief return pointer to read buffer. - * - * The return value points to items_available() number of items - */ - const void* read_pointer(); - - /* - * \brief tell buffer we read \p items from it - */ - void update_read_pointer(int nitems); - - void set_done(bool done) { d_buffer->set_done(done); } - bool done() const { return d_buffer->done(); } - - gr::thread::mutex* mutex() { return d_buffer->mutex(); } - - uint64_t nitems_read() { return d_abs_read_offset; } - - void reset_nitem_counter() { d_abs_read_offset = 0; } - - size_t get_sizeof_item() { return d_buffer->get_sizeof_item(); } - - /*! - * \brief Return the block that reads via this reader. - * - */ - block_sptr link() { return block_sptr(d_link); } - - /*! - * \brief Given a [start,end), returns a vector all tags in the range. - * - * Get a vector of tags in given range. Range of counts is from start to end-1. - * - * Tags are tuples of: - * (item count, source id, key, value) - * - * \param v a vector reference to return tags into - * \param abs_start a uint64 count of the start of the range of interest - * \param abs_end a uint64 count of the end of the range of interest - * \param id the unique ID of the block to make sure already deleted tags - * are not returned - */ - void get_tags_in_range(std::vector<tag_t>& v, - uint64_t abs_start, - uint64_t abs_end, - long id); - - // ------------------------------------------------------------------------- - -private: - friend class buffer; - friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, - int nzero_preload, - block_sptr link, - int delay); - - buffer_sptr d_buffer; - unsigned int d_read_index; // in items [0,d->buffer.d_bufsize) - uint64_t d_abs_read_offset; // num items seen since the start - std::weak_ptr<block> d_link; // block that reads via this buffer reader - unsigned d_attr_delay; // sample delay attribute for tag propagation - - //! constructor is private. Use gr::buffer::add_reader to create instances - buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link); -}; - -//! returns # of buffer_readers currently allocated -GR_RUNTIME_API long buffer_reader_ncurrently_allocated(); - } /* namespace gr */ #endif /* INCLUDED_GR_RUNTIME_BUFFER_H */ diff --git a/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h b/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h new file mode 100644 index 0000000000..b8cc7cdc87 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h @@ -0,0 +1,116 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_GR_RUNTIME_BUFFER_DOUBLE_MAPPED_H +#define INCLUDED_GR_RUNTIME_BUFFER_DOUBLE_MAPPED_H + +#include <gnuradio/api.h> +#include <gnuradio/buffer.h> +#include <gnuradio/logger.h> +#include <gnuradio/runtime_types.h> + +namespace gr { + +class vmcircbuf; + +/*! + * \brief Note this function is only used and really intended to be used in + * qa_buffer.cc for the unit tests of buffer_double_mapped. + * + */ +GR_RUNTIME_API buffer_sptr make_buffer_double_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link = block_sptr()); + +/*! + * \brief Single writer, multiple reader fifo. + * \ingroup internal + */ +class GR_RUNTIME_API buffer_double_mapped : public buffer +{ +public: + gr::logger_ptr d_logger; + gr::logger_ptr d_debug_logger; + + virtual ~buffer_double_mapped(); + + /*! + * \brief return number of items worth of space available for writing + */ + virtual int space_available(); + +protected: + /*! + * sets d_vmcircbuf, d_base, d_bufsize. + * returns true iff successful. + */ + bool allocate_buffer(int nitems, size_t sizeof_item); + + virtual unsigned index_add(unsigned a, unsigned b) + { + unsigned s = a + b; + + if (s >= d_bufsize) + s -= d_bufsize; + + assert(s < d_bufsize); + return s; + } + + virtual unsigned index_sub(unsigned a, unsigned b) + { + int s = a - b; + + if (s < 0) + s += d_bufsize; + + assert((unsigned)s < d_bufsize); + return s; + } + +private: + friend class buffer_reader; + + friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link, + block_sptr buf_owner); + friend GR_RUNTIME_API buffer_sptr make_buffer_double_mapped( + int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, block_sptr link); + + std::unique_ptr<gr::vmcircbuf> d_vmcircbuf; + + /*! + * \brief constructor is private. Use gr_make_buffer to create instances. + * + * Allocate a buffer that holds at least \p nitems of size \p sizeof_item. + * + * \param nitems is the minimum number of items the buffer will hold. + * \param sizeof_item is the size of an item in bytes. + * \param downstream_lcm_nitems is the least common multiple of the items to + * read by downstream blocks + * \param link is the block that writes to this buffer. + * + * The total size of the buffer will be rounded up to a system + * dependent boundary. This is typically the system page size, but + * under MS windows is 64KB. + */ + buffer_double_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link); +}; + +} /* namespace gr */ + + +#endif /* INCLUDED_GR_RUNTIME_BUFFER_DOUBLE_MAPPED_H */ diff --git a/gnuradio-runtime/include/gnuradio/buffer_reader.h b/gnuradio-runtime/include/gnuradio/buffer_reader.h new file mode 100644 index 0000000000..a7e51d02e9 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/buffer_reader.h @@ -0,0 +1,192 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004,2009-2011,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_GR_RUNTIME_BUFFER_READER_H +#define INCLUDED_GR_RUNTIME_BUFFER_READER_H + +#include <gnuradio/api.h> +#include <gnuradio/buffer.h> +#include <gnuradio/logger.h> +#include <gnuradio/runtime_types.h> +#include <gnuradio/tags.h> +#include <gnuradio/thread/thread.h> +#include <boost/weak_ptr.hpp> +#include <map> +#include <memory> + +namespace gr { + +class buffer_reader_sm; + +/*! + * \brief Create a new gr::buffer_reader and attach it to buffer \p buf + * \param buf is the buffer the \p gr::buffer_reader reads from. + * \param nzero_preload -- number of zero items to "preload" into buffer. + * \param link is the block that reads from the buffer using this gr::buffer_reader. + * \param delay Optional setting to declare the buffer's sample delay. + */ +GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, + int nzero_preload, + block_sptr link = block_sptr(), + int delay = 0); + +//! returns # of buffers currently allocated +GR_RUNTIME_API long buffer_ncurrently_allocated(); + + +// --------------------------------------------------------------------------- + +/*! + * \brief How we keep track of the readers of a gr::buffer. + * \ingroup internal + */ +class GR_RUNTIME_API buffer_reader +{ +public: +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + gr::logger_ptr d_logger; + gr::logger_ptr d_debug_logger; +#endif + + virtual ~buffer_reader(); + + /*! + * Declares the sample delay for this reader. + * + * See gr::block::declare_sample_delay for details. + * + * \param delay The new sample delay + */ + void declare_sample_delay(unsigned delay); + + /*! + * Gets the sample delay for this reader. + * + * See gr::block::sample_delay for details. + */ + unsigned sample_delay() const; + + /*! + * \brief Return number of items available for reading. + */ + virtual int items_available(); // const + + /*! + * \brief Return buffer this reader reads from. + */ + buffer_sptr buffer() const { return d_buffer; } + + /*! + * \brief Return maximum number of items that could ever be available for reading. + * This is used as a sanity check in the scheduler to avoid looping forever. + */ + int max_possible_items_available() const { return d_buffer->bufsize() - 1; } + + /*! + * \brief return pointer to read buffer. + * + * The return value points to items_available() number of items + */ + const void* read_pointer(); + + /* + * \brief tell buffer we read \p items from it + */ + void update_read_pointer(int nitems); + + void set_done(bool done) { d_buffer->set_done(done); } + bool done() const { return d_buffer->done(); } + + gr::thread::mutex* mutex() { return d_buffer->mutex(); } + + uint64_t nitems_read() const { return d_abs_read_offset; } + + void reset_nitem_counter() + { + d_read_index = 0; + d_abs_read_offset = 0; + } + + size_t get_sizeof_item() { return d_buffer->get_sizeof_item(); } + + /*! + * \brief Return the block that reads via this reader. + * + */ + block_sptr link() { return block_sptr(d_link); } + + /*! + * \brief Given a [start,end), returns a vector all tags in the range. + * + * Get a vector of tags in given range. Range of counts is from start to end-1. + * + * Tags are tuples of: + * (item count, source id, key, value) + * + * \param v a vector reference to return tags into + * \param abs_start a uint64 count of the start of the range of interest + * \param abs_end a uint64 count of the end of the range of interest + * \param id the unique ID of the block to make sure already deleted tags + * are not returned + */ + void get_tags_in_range(std::vector<tag_t>& v, + uint64_t abs_start, + uint64_t abs_end, + long id); + + /*! + * \brief Returns true when the current thread is ready to call the callback, + * false otherwise. Note if input_blocked_callback is overridden then this + * function should also be overridden. + */ + virtual bool input_blkd_cb_ready(int items_required) const { return false; } + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the input is blocked. Override this function if needed. + */ + virtual bool input_blocked_callback(int items_required, int items_avail) + { + return false; + } + + // ------------------------------------------------------------------------- + unsigned int get_read_index() const { return d_read_index; } + uint64_t get_abs_read_offset() const { return d_abs_read_offset; } + +protected: + friend class buffer; + friend class buffer_double_mapped; + friend class buffer_single_mapped; + friend class buffer_reader_sm; + + friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, + int nzero_preload, + block_sptr link, + int delay); + + buffer_sptr d_buffer; + unsigned int d_read_index; // in items [0,d->buffer.d_bufsize) ** see NB + uint64_t d_abs_read_offset; // num items seen since the start ** see NB + std::weak_ptr<block> d_link; // block that reads via this buffer reader + unsigned d_attr_delay; // sample delay attribute for tag propagation + // ** NB: buffer::d_mutex protects d_read_index and d_abs_read_offset + + //! constructor is private. Use gr::buffer::add_reader to create instances + buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link); +}; + +//! returns # of buffer_readers currently allocated +GR_RUNTIME_API long buffer_reader_ncurrently_allocated(); + +} /* namespace gr */ + +#endif /* INCLUDED_GR_RUNTIME_BUFFER_READER_H */ diff --git a/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h b/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h new file mode 100644 index 0000000000..627bfd8582 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h @@ -0,0 +1,65 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004,2009-2011,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_GR_RUNTIME_BUFFER_READER_SM_H +#define INCLUDED_GR_RUNTIME_BUFFER_READER_SM_H + +#include <gnuradio/api.h> +#include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/logger.h> +#include <gnuradio/runtime_types.h> +#include <gnuradio/tags.h> +#include <gnuradio/thread/thread.h> +#include <boost/weak_ptr.hpp> +#include <map> +#include <memory> + +namespace gr { + +class GR_RUNTIME_API buffer_reader_sm : public buffer_reader +{ +public: + ~buffer_reader_sm(); + + /*! + * \brief Return number of items available for reading. + */ + virtual int items_available(); // const + + /*! + * \brief Return true if thread is ready to call input_blocked_callback, + * false otherwise + */ + virtual bool input_blkd_cb_ready(int items_required) const; + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the input is blocked + */ + virtual bool input_blocked_callback(int items_required, int items_avail); + +private: + friend class buffer; + friend class buffer_double_mapped; + friend class buffer_single_mapped; + + friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf, + int nzero_preload, + block_sptr link, + int delay); + + //! constructor is private. Use gr::buffer::add_reader to create instances + buffer_reader_sm(buffer_sptr buffer, unsigned int read_index, block_sptr link); +}; + +} // namespace gr + +#endif /* INCLUDED_GR_RUNTIME_BUFFER_READER_SM_H */ diff --git a/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h b/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h new file mode 100644 index 0000000000..5377534aa8 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h @@ -0,0 +1,167 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H +#define INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H + +#include <functional> + +#include <gnuradio/api.h> +#include <gnuradio/block.h> +#include <gnuradio/buffer.h> +#include <gnuradio/logger.h> +#include <gnuradio/runtime_types.h> + +namespace gr { + +/*! + * TODO: update this + * + * \brief Single writer, multiple reader fifo. + * \ingroup internal + */ +class GR_RUNTIME_API buffer_single_mapped : public buffer +{ +public: + gr::logger_ptr d_logger; + gr::logger_ptr d_debug_logger; + + virtual ~buffer_single_mapped(); + + /*! + * \brief Return the block that owns this buffer. + */ + block_sptr buf_owner() { return d_buf_owner; } + + /*! + * \brief return number of items worth of space available for writing + */ + virtual int space_available(); + + virtual void update_reader_block_history(unsigned history, int delay) + { + unsigned old_max = d_max_reader_history; + d_max_reader_history = std::max(d_max_reader_history, history); + if (d_max_reader_history != old_max) { + d_write_index = d_max_reader_history - 1; + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "buffer_single_mapped constructor -- set wr index to: " + << d_write_index; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Reset the reader's read index if the buffer's write index has changed. + // Note that "history - 1" is the nzero_preload value passed to + // buffer_add_reader. + for (auto reader : d_readers) { + reader->d_read_index = d_write_index - (reader->link()->history() - 1); + } + } + + // Only attempt to set has history flag if it is not already set + if (!d_has_history) { + // Blocks that set delay may set history to delay + 1 but this is + // not "real" history + d_has_history = ((static_cast<int>(history) - 1) != delay); + } + } + + void deleter(char* ptr) + { + // Delegate free of the underlying buffer to the block that owns it + if (ptr != nullptr) + buf_owner()->free_custom_buffer(ptr); + } + + /*! + * \brief Return true if thread is ready to call the callback, false otherwise + */ + virtual bool output_blkd_cb_ready(int output_multiple); + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the output is blocked + */ + virtual bool output_blocked_callback(int output_multiple, bool force); + +protected: + /*! + * sets d_base, d_bufsize. + * returns true iff successful. + */ + bool allocate_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems); + + virtual unsigned index_add(unsigned a, unsigned b) + { + unsigned s = a + b; + + if (s >= d_bufsize) + s -= d_bufsize; + + assert(s < d_bufsize); + return s; + } + + virtual unsigned index_sub(unsigned a, unsigned b) + { + // NOTE: a is writer ptr and b is read ptr + int s = a - b; + + if (s < 0) + s = d_bufsize - b; + + assert((unsigned)s < d_bufsize); + return s; + } + +private: + friend class buffer_reader; + + friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link, + block_sptr buf_owner); + + block_sptr d_buf_owner; // block that "owns" this buffer + + std::unique_ptr<char, std::function<void(char*)>> d_buffer; + + /*! + * \brief constructor is private. Use gr_make_buffer to create instances. + * + * Allocate a buffer that holds at least \p nitems of size \p sizeof_item. + * + * \param nitems is the minimum number of items the buffer will hold. + * \param sizeof_item is the size of an item in bytes. + * \param downstream_lcm_nitems is the least common multiple of the items to + * read by downstream blocks + * \param link is the block that writes to this buffer. + * \param buf_owner if the block that owns the buffer which may or may not + * be the same as the block that writes to this buffer + * + * The total size of the buffer will be rounded up to a system + * dependent boundary. This is typically the system page size, but + * under MS windows is 64KB. + */ + buffer_single_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link, + block_sptr buf_owner); +}; + +} /* namespace gr */ + + +#endif /* INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H */ diff --git a/gnuradio-runtime/include/gnuradio/buffer_type.h b/gnuradio-runtime/include/gnuradio/buffer_type.h new file mode 100644 index 0000000000..527b5ff8e3 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/buffer_type.h @@ -0,0 +1,88 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H +#define INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H + +#include <gnuradio/api.h> + +#include <cstdint> +#include <mutex> +#include <string> + +namespace gr { + + +class GR_RUNTIME_API buffer_type_base +{ +public: + virtual ~buffer_type_base(){}; + + // Do not allow copying or assignment + buffer_type_base(buffer_type_base const&) = delete; + void operator=(buffer_type_base const&) = delete; + + // Allow equality and inequality comparison + bool operator==(const buffer_type_base& other) const + { + return d_value == other.d_value; + } + + bool operator!=(const buffer_type_base& other) const + { + return d_value != other.d_value; + } + + // Do not allow other comparison (just in case) + bool operator<(const buffer_type_base& other) = delete; + bool operator>(const buffer_type_base& other) = delete; + bool operator<=(const buffer_type_base& other) = delete; + bool operator>=(const buffer_type_base& other) = delete; + + const std::string& name() const { return d_name; } + +protected: + static uint32_t s_nextId; + static std::mutex s_mutex; + + uint32_t d_value; + std::string d_name; + + // Private constructor + buffer_type_base(const char* name) : d_name(name) + { + std::lock_guard<std::mutex> lock(s_mutex); + d_value = s_nextId++; + } +}; + +typedef const buffer_type_base& buffer_type_t; + + +#define MAKE_CUSTOM_BUFFER_TYPE(CLASSNAME) \ + class GR_RUNTIME_API buftype_##CLASSNAME : public buffer_type_base \ + { \ + public: \ + static buffer_type_t get() \ + { \ + static buftype_##CLASSNAME instance; \ + return instance; \ + } \ + \ + private: \ + buftype_##CLASSNAME() : buffer_type_base(#CLASSNAME) {} \ + }; + +MAKE_CUSTOM_BUFFER_TYPE(DEFAULT_NON_CUSTOM); +MAKE_CUSTOM_BUFFER_TYPE(CUSTOM_HOST); // used only for test purposes + +} // namespace gr + +#endif /* INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H */
\ No newline at end of file diff --git a/gnuradio-runtime/include/gnuradio/custom_lock.h b/gnuradio-runtime/include/gnuradio/custom_lock.h new file mode 100644 index 0000000000..6422254d99 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/custom_lock.h @@ -0,0 +1,64 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_GR_CUSTOM_LOCK_H +#define INCLUDED_GR_CUSTOM_LOCK_H + +#include <gnuradio/api.h> +#include <gnuradio/logger.h> +#include <gnuradio/thread/thread.h> + +namespace gr { + +/*! + * Custom lock interface. Objects should implement this interface in order to + * use the custom_lock object below. + */ +class custom_lock_if +{ +public: + /*! + * This function will be executed on construction of the custom lock. + */ + virtual void on_lock(gr::thread::scoped_lock& lock) = 0; + + /*! + * This function will be executed on destruction of the custom lock. + */ + virtual void on_unlock() = 0; +}; + +/*! + * Write me! + */ +class custom_lock +{ +public: + explicit custom_lock(gr::thread::mutex& mutex, std::shared_ptr<custom_lock_if> locker) + : d_mutex(mutex), d_lock(mutex), d_locker(locker) + { + d_locker->on_lock(d_lock); + } + + ~custom_lock() { d_locker->on_unlock(); } + + // Disallow copying and assignment + custom_lock(custom_lock const&) = delete; + custom_lock& operator=(custom_lock const&) = delete; + +private: + gr::thread::mutex& d_mutex; + gr::thread::scoped_lock d_lock; + std::shared_ptr<custom_lock_if> d_locker; +}; + +} /* namespace gr */ + +#endif /* INCLUDED_GR_CUSTOM_LOCK_H */
\ No newline at end of file diff --git a/gnuradio-runtime/include/gnuradio/logger.h b/gnuradio-runtime/include/gnuradio/logger.h index 6891706fb0..e9409c93a3 100644 --- a/gnuradio-runtime/include/gnuradio/logger.h +++ b/gnuradio-runtime/include/gnuradio/logger.h @@ -40,7 +40,7 @@ namespace gr { * \brief GR_LOG macros * \ingroup logging * - * These macros wrap the standard LOG4CPP_LEVEL macros. The availablie macros + * These macros wrap the standard LOG4CPP_LEVEL macros. The available macros * are: * LOG_DEBUG * LOG_INFO diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt index 42486de801..ca96549386 100644 --- a/gnuradio-runtime/lib/CMakeLists.txt +++ b/gnuradio-runtime/lib/CMakeLists.txt @@ -55,6 +55,11 @@ add_library(gnuradio-runtime block_gateway_impl.cc block_registry.cc buffer.cc + buffer_double_mapped.cc + buffer_reader.cc + buffer_reader_sm.cc + buffer_single_mapped.cc + buffer_type.cc flat_flowgraph.cc flowgraph.cc hier_block2.cc diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc index cddf5a5baa..bb6ce95298 100644 --- a/gnuradio-runtime/lib/block.cc +++ b/gnuradio-runtime/lib/block.cc @@ -24,6 +24,12 @@ namespace gr { +// Moved from flat_flowgraph.cc +// 32Kbyte buffer size between blocks +#define GR_FIXED_BUFFER_SIZE (32 * (1L << 10)) + +static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE; + block::block(const std::string& name, io_signature::sptr input_signature, io_signature::sptr output_signature) @@ -375,11 +381,146 @@ void block::set_min_output_buffer(int port, long min_output_buffer) d_min_output_buffer[port] = min_output_buffer; } +void block::allocate_detail(int ninputs, + int noutputs, + const std::vector<int>& downstream_max_nitems_vec, + const std::vector<uint64_t>& downstream_lcm_nitems_vec) +{ + block_detail_sptr detail = make_block_detail(ninputs, noutputs); + + GR_LOG_DEBUG(d_debug_logger, "Creating block detail for " + identifier()); + + for (int i = 0; i < noutputs; i++) { + expand_minmax_buffer(i); + + buffer_sptr buffer = allocate_buffer( + i, downstream_max_nitems_vec[i], downstream_lcm_nitems_vec[i]); + GR_LOG_DEBUG(d_debug_logger, + "Allocated buffer for output " + identifier() + " " + + std::to_string(i)); + detail->set_output(i, buffer); + + // Update the block's max_output_buffer based on what was actually allocated. + if ((max_output_buffer(i) != buffer->bufsize()) && (max_output_buffer(i) != -1)) + GR_LOG_WARN(d_logger, + boost::format("Block (%1%) max output buffer set to %2%" + " instead of requested %3%") % + alias() % buffer->bufsize() % max_output_buffer(i)); + set_max_output_buffer(i, buffer->bufsize()); + } + + // Store the block_detail that was created above + set_detail(detail); +} + +buffer_sptr block::replace_buffer(uint32_t out_port, block_sptr block_owner) +{ + block_detail_sptr detail_ = detail(); + buffer_sptr orig_buffer = detail_->output(out_port); + + // Make a new buffer but this time use the passed in block as the owner + buffer_sptr new_buffer = make_buffer(orig_buffer->bufsize(), + orig_buffer->get_sizeof_item(), + orig_buffer->get_downstream_lcm_nitems(), + shared_from_base<block>(), + block_owner); + + detail_->set_output(out_port, new_buffer); + return new_buffer; +} bool block::update_rate() const { return d_update_rate; } void block::enable_update_rate(bool en) { d_update_rate = en; } +buffer_sptr block::allocate_buffer(int port, + int downstream_max_nitems, + uint64_t downstream_lcm_nitems) +{ + int item_size = output_signature()->sizeof_stream_item(port); + + // *2 because we're now only filling them 1/2 way in order to + // increase the available parallelism when using the TPB scheduler. + // (We're double buffering, where we used to single buffer) + int nitems = s_fixed_buffer_size * 2 / item_size; + + // Make sure there are at least twice the output_multiple no. of items + if (nitems < 2 * output_multiple()) // Note: this means output_multiple() + nitems = 2 * output_multiple(); // can't be changed by block dynamically + + // Limit buffer size if indicated + if (max_output_buffer(port) > 0) { + // std::cout << "constraining output items to " << block->max_output_buffer(port) + // << "\n"; + nitems = std::min((long)nitems, (long)max_output_buffer(port)); + nitems -= nitems % output_multiple(); + if (nitems < 1) + throw std::runtime_error("problems allocating a buffer with the given max " + "output buffer constraint!"); + } else if (min_output_buffer(port) > 0) { + nitems = std::max((long)nitems, (long)min_output_buffer(port)); + nitems -= nitems % output_multiple(); + if (nitems < 1) + throw std::runtime_error("problems allocating a buffer with the given min " + "output buffer constraint!"); + } + + // If any downstream blocks are decimators and/or have a large output_multiple, + // ensure we have a buffer at least twice their decimation factor*output_multiple + nitems = std::max(nitems, downstream_max_nitems); + + // We're going to let this fail once and retry. If that fails, throw and exit. + buffer_sptr buf; + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + GR_LOG_DEBUG(d_logger, + "Block: " + name() + " allocated buffer for output " + identifier()); +#endif + + try { +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "downstream_max_nitems: " << downstream_max_nitems + << " -- downstream_lcm_nitems: " << downstream_lcm_nitems + << " -- output_multiple(): " << output_multiple() + << " -- out_mult_set: " << output_multiple_set() << " -- nitems: " << nitems + << " -- history: " << history() << " -- relative_rate: " << relative_rate(); + if (relative_rate() != 1.0) { + msg << " (" << relative_rate_i() << " / " << relative_rate_d() << ")"; + } + msg << " -- fixed_rate: " << fixed_rate(); + if (fixed_rate()) { + int num_inputs = fixed_rate_noutput_to_ninput(1) - (history() - 1); + msg << " (" << num_inputs << " -> " + << fixed_rate_ninput_to_noutput(num_inputs + (history() - 1)) << ")"; + } + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + buf = make_buffer(nitems, + item_size, + downstream_lcm_nitems, + shared_from_base<block>(), + shared_from_base<block>()); + + } catch (std::bad_alloc&) { + buf = make_buffer(nitems, + item_size, + downstream_lcm_nitems, + shared_from_base<block>(), + shared_from_base<block>()); + } + + // Set the max noutput items size here to make sure it's always + // set in the block and available in the start() method. + // But don't overwrite if the user has set this externally. + if (!is_set_max_noutput_items()) + set_max_noutput_items(nitems); + + return buf; +} + float block::pc_noutput_items() { if (d_detail) { diff --git a/gnuradio-runtime/lib/block_detail.cc b/gnuradio-runtime/lib/block_detail.cc index 239223603d..f5283c56b0 100644 --- a/gnuradio-runtime/lib/block_detail.cc +++ b/gnuradio-runtime/lib/block_detail.cc @@ -14,6 +14,7 @@ #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/logger.h> namespace gr { diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc index c026934bd9..6fbf2c5c17 100644 --- a/gnuradio-runtime/lib/block_executor.cc +++ b/gnuradio-runtime/lib/block_executor.cc @@ -14,13 +14,13 @@ #include <gnuradio/block.h> #include <gnuradio/block_detail.h> -#include <gnuradio/buffer.h> -#include <gnuradio/logger.h> +#include <gnuradio/custom_lock.h> #include <gnuradio/prefs.h> #include <block_executor.h> #include <limits> #include <sstream> + namespace gr { // must be defined to either 0 or 1 @@ -53,27 +53,55 @@ inline static unsigned int round_down(unsigned int n, unsigned int multiple) // buffers or -1 if we're output blocked and the output we're // blocked on is done. // -static int -min_available_space(block_detail* d, int output_multiple, int min_noutput_items) +static int min_available_space(block* m, + block_detail* d, + int output_multiple, + int min_noutput_items, + int& output_idx) { + gr::logger_ptr logger; + gr::logger_ptr debug_logger; + gr::configure_default_loggers(logger, debug_logger, "min_available_space"); + int min_space = std::numeric_limits<int>::max(); if (min_noutput_items == 0) min_noutput_items = 1; - for (int i = 0; i < d->noutputs(); i++) { + for (int i = output_idx; i < d->noutputs(); i++) { buffer_sptr out_buf = d->output(i); gr::thread::scoped_lock guard(*out_buf->mutex()); - int avail_n = round_down(out_buf->space_available(), output_multiple); + int space_avail = out_buf->space_available(); + int avail_n = round_down(space_avail, output_multiple); + // If not strictly output multiple size aligned, potentially use all + // space available in the output buffer + if (avail_n == 0 && space_avail < output_multiple && !m->output_multiple_set()) { + avail_n = std::max(avail_n, space_avail); + } int best_n = round_down(out_buf->bufsize() / 2, output_multiple); if (best_n < min_noutput_items) throw std::runtime_error("Buffer too small for min_noutput_items"); int n = std::min(avail_n, best_n); if (n < min_noutput_items) { // We're blocked on output. - if (out_buf->done()) { // Downstream is done, therefore we're done. + LOG(std::ostringstream msg; + msg << m << " **[i=" << i << "] output_multiple=" << output_multiple + << " min_noutput_items=" << min_noutput_items + << " avail_n=" << avail_n << " best_n=" << best_n << " n=" << n + << " min_space=" << min_space << " outbuf_done=" << out_buf->done(); + GR_LOG_INFO(debug_logger, msg.str());); + + if (out_buf->done()) { // Downstream is done, therefore we're done. return -1; } + + output_idx = i; return 0; } min_space = std::min(min_space, n); + + LOG(std::ostringstream msg; + msg << m << " [i=" << i << "] output_multiple=" << output_multiple + << " min_noutput_items=" << min_noutput_items << " avail_n=" << avail_n + << " best_n=" << best_n << " n=" << n << " min_space=" << min_space; + GR_LOG_INFO(debug_logger, msg.str());); } return min_space; } @@ -233,6 +261,7 @@ block_executor::state block_executor::run_one_iteration() int max_noutput_items; int new_alignment = 0; int alignment_state = -1; + int output_idx = 0; block* m = d_block.get(); block_detail* d = m->detail().get(); @@ -255,8 +284,10 @@ block_executor::state block_executor::run_one_iteration() d_start_nitems_read.resize(0); // determine the minimum available output space - noutput_items = - min_available_space(d, m->output_multiple(), m->min_noutput_items()); + output_idx = 0; + out_try_again: + noutput_items = min_available_space( + m, d, m->output_multiple(), m->min_noutput_items(), output_idx); noutput_items = std::min(noutput_items, max_noutput_items); LOG(std::ostringstream msg; msg << "source: noutput_items = " << noutput_items; GR_LOG_INFO(d_debug_logger, msg.str());); @@ -264,8 +295,28 @@ block_executor::state block_executor::run_one_iteration() goto were_done; if (noutput_items == 0) { // we're output blocked - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); - return BLKD_OUT; + LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT"; + GR_LOG_INFO(d_debug_logger, msg.str());); + + buffer_sptr out_buf = d->output(output_idx); + + if (out_buf->output_blkd_cb_ready(m->output_multiple())) { + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + if (!out_buf->output_blocked_callback(m->output_multiple())) { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([1] callback FAILED)"; + GR_LOG_INFO(d_debug_logger, msg.str());); + return BLKD_OUT; + } else { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx + << ")"; + GR_LOG_INFO(d_debug_logger, msg.str());); + goto out_try_again; + } + } else { + return BLKD_OUT; + } } goto setup_call_to_work; // jump to common code @@ -278,14 +329,14 @@ block_executor::state block_executor::run_one_iteration() d_input_done.resize(d->ninputs()); d_output_items.resize(0); d_start_nitems_read.resize(d->ninputs()); - LOG(GR_LOG_INFO(d_debug_logger, "sink");); + // LOG(GR_LOG_INFO(d_debug_logger, "sink");); + LOG(std::ostringstream msg; msg << m << " -- sink"; + GR_LOG_INFO(d_debug_logger, msg.str());); max_items_avail = 0; for (int i = 0; i < d->ninputs(); i++) { { - /* - * Acquire the mutex and grab local copies of items_available and done. - */ + // Acquire the mutex and grab local copies of items_available and done. buffer_reader_sptr in_buf = d->input(i); gr::thread::scoped_lock guard(*in_buf->mutex()); d_ninput_items[i] = in_buf->items_available(); @@ -293,10 +344,10 @@ block_executor::state block_executor::run_one_iteration() } LOG(std::ostringstream msg; - msg << "d_ninput_items[" << i << "] = " << d_ninput_items[i]; + msg << m << " -- d_ninput_items[" << i << "] = " << d_ninput_items[i]; GR_LOG_INFO(d_debug_logger, msg.str());); LOG(std::ostringstream msg; - msg << "d_input_done[" << i << "] = " << d_input_done[i]; + msg << m << " -- d_input_done[" << i << "] = " << d_input_done[i]; GR_LOG_INFO(d_debug_logger, msg.str());); if (d_ninput_items[i] < m->output_multiple() && d_input_done[i]) @@ -309,13 +360,16 @@ block_executor::state block_executor::run_one_iteration() noutput_items = (int)(max_items_avail * m->relative_rate()); noutput_items = round_down(noutput_items, m->output_multiple()); noutput_items = std::min(noutput_items, max_noutput_items); - LOG(std::ostringstream msg; msg << "max_items_avail = " << max_items_avail; + LOG(std::ostringstream msg; + msg << m << " -- max_items_avail = " << max_items_avail; GR_LOG_INFO(d_debug_logger, msg.str());); - LOG(std::ostringstream msg; msg << "noutput_items = " << noutput_items; + LOG(std::ostringstream msg; msg << m << " -- noutput_items = " << noutput_items; GR_LOG_INFO(d_debug_logger, msg.str());); if (noutput_items == 0) { // we're blocked on input - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; + GR_LOG_INFO(d_debug_logger, msg.str())); return BLKD_IN; } @@ -331,12 +385,11 @@ block_executor::state block_executor::run_one_iteration() d_output_items.resize(d->noutputs()); d_start_nitems_read.resize(d->ninputs()); + blkd_in_try_again: max_items_avail = 0; for (int i = 0; i < d->ninputs(); i++) { { - /* - * Acquire the mutex and grab local copies of items_available and done. - */ + // Acquire the mutex and grab local copies of items_available and done. buffer_reader_sptr in_buf = d->input(i); gr::thread::scoped_lock guard(*in_buf->mutex()); d_ninput_items[i] = in_buf->items_available(); @@ -346,11 +399,13 @@ block_executor::state block_executor::run_one_iteration() } // determine the minimum available output space - noutput_items = - min_available_space(d, m->output_multiple(), m->min_noutput_items()); + output_idx = 0; + out_try_again2: + noutput_items = min_available_space( + m, d, m->output_multiple(), m->min_noutput_items(), output_idx); if (ENABLE_LOGGING) { std::ostringstream msg; - msg << "regular "; + msg << m << " -- regular "; msg << m->relative_rate_i() << ":" << m->relative_rate_d(); msg << " max_items_avail = " << max_items_avail; msg << " noutput_items = " << noutput_items; @@ -360,13 +415,36 @@ block_executor::state block_executor::run_one_iteration() goto were_done; if (noutput_items == 0) { // we're output blocked - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); - return BLKD_OUT; + // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT"; + GR_LOG_INFO(d_debug_logger, msg.str())); + + buffer_sptr out_buf = d->output(output_idx); + + if (out_buf->output_blkd_cb_ready(m->output_multiple())) { + // Call the output blocked callback which will tell us if it was + // able to unblock the output + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + if (!out_buf->output_blocked_callback(m->output_multiple())) { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([2] callback FAILED)"; + GR_LOG_INFO(d_debug_logger, msg.str());); + return BLKD_OUT; + } else { + LOG(std::ostringstream msg; + msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx + << ")"; + GR_LOG_INFO(d_debug_logger, msg.str());); + goto out_try_again2; + } + } else { + return BLKD_OUT; + } } try_again: if (m->fixed_rate()) { - // try to work it forward starting with max_items_avail. + // Try to work it forward starting with max_items_avail. // We want to try to consume all the input we've got. int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail); @@ -416,6 +494,10 @@ block_executor::state block_executor::run_one_iteration() // ask the block how much input they need to produce noutput_items m->forecast(noutput_items, d_ninput_items_required); + LOG(std::ostringstream msg; + msg << m << " -- FCAST noutput_items=" << noutput_items << " inputs_required=" + << d_ninput_items_required[0] << " inputs_avail=" << d_ninput_items[0]; + GR_LOG_INFO(d_debug_logger, msg.str())); // See if we've got sufficient input available and make sure we // didn't overflow on the input. @@ -445,12 +527,29 @@ block_executor::state block_executor::run_one_iteration() } // We're blocked on input - LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; + GR_LOG_INFO(d_debug_logger, msg.str())); + + buffer_reader_sptr in_buf = d->input(i); + + LOG(std::ostringstream msg; + msg << m << " (t: " << this << ") -- pre-callback"; + GR_LOG_DEBUG(d_debug_logger, msg.str())); + + if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) { + gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer()); + if (in_buf->input_blocked_callback(d_ninput_items_required[i], + d_ninput_items[i])) { + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN -- TRY AGAIN"; + GR_LOG_INFO(d_debug_logger, msg.str())); + goto blkd_in_try_again; + } + } + if (d_input_done[i]) // If the upstream block is done, we're done goto were_done; // Is it possible to ever fulfill this request? - buffer_reader_sptr in_buf = d->input(i); if (d_ninput_items_required[i] > in_buf->max_possible_items_available()) { // Nope, never going to happen... std::ostringstream msg; @@ -476,14 +575,18 @@ block_executor::state block_executor::run_one_iteration() // We've got enough data on each input to produce noutput_items. // Finish setting up the call to work. - for (int i = 0; i < d->ninputs(); i++) + for (int i = 0; i < d->ninputs(); i++) { + d->input(i)->buffer()->increment_active(); d_input_items[i] = d->input(i)->read_pointer(); + } setup_call_to_work: d->d_produce_or = 0; - for (int i = 0; i < d->noutputs(); i++) + for (int i = 0; i < d->noutputs(); i++) { + d->output(i)->increment_active(); d_output_items[i] = d->output(i)->write_pointer(); + } // determine where to start looking for new tags for (int i = 0; i < d->ninputs(); i++) @@ -504,7 +607,10 @@ block_executor::state block_executor::run_one_iteration() #endif /* GR_PERFORMANCE_COUNTERS */ LOG(std::ostringstream msg; - msg << "general_work: noutput_items = " << noutput_items << " result = " << n; + msg << m << " -- general_work: noutput_items = " << noutput_items + << " ninput_items=" << (d->ninputs() >= 1 ? d_ninput_items[0] : 0) + << " ninput_req=" << (d->ninputs() >= 1 ? d_ninput_items_required[0] : 0) + << " result = " << n; GR_LOG_INFO(d_debug_logger, msg.str());); // Adjust number of unaligned items left to process @@ -521,14 +627,21 @@ block_executor::state block_executor::run_one_iteration() m->mp_relative_rate(), m->update_rate(), d_returned_tags, - m->unique_id())) + m->unique_id())) { + d->post_work_cleanup(); goto were_done; + } - if (n == block::WORK_DONE) + if (n == block::WORK_DONE) { + d->post_work_cleanup(); goto were_done; + } - if (n != block::WORK_CALLED_PRODUCE) + if (n != block::WORK_CALLED_PRODUCE) { d->produce_each(n); // advance write pointers + } + + d->post_work_cleanup(); // For some blocks that can change their produce/consume ratio // (the relative_rate), we might want to automatically update @@ -558,13 +671,30 @@ block_executor::state block_executor::run_one_iteration() } */ + // The call to general_work() produced no output therefore the block may + // be "effectively output blocked". Call the output blocked callback + // just in case, it can do no harm. + for (int i = 0; i < d->noutputs(); i++) { + buffer_sptr out_buf = d->output(i); + LOG(std::ostringstream msg; + msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED"; + GR_LOG_DEBUG(d_debug_logger, msg.str());); + gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); + out_buf->output_blocked_callback(m->output_multiple(), true); + LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i + << "] -- OUTPUT BLOCKED CBACK: " << rc; + GR_LOG_DEBUG(d_debug_logger, msg.str());); + } + // Have the caller try again... return READY_NO_OUTPUT; } GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine"); were_done: - LOG(GR_LOG_INFO(d_debug_logger, "we're done");); + // LOG(GR_LOG_INFO(d_debug_logger, "we're done");); + LOG(std::ostringstream msg; msg << m << " -- we're done"; + GR_LOG_INFO(d_debug_logger, msg.str())); d->set_done(true); return DONE; } diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc index cb6b949932..7fd0a39579 100644 --- a/gnuradio-runtime/lib/buffer.cc +++ b/gnuradio-runtime/lib/buffer.cc @@ -13,6 +13,10 @@ #endif #include "vmcircbuf.h" #include <gnuradio/buffer.h> +#include <gnuradio/buffer_double_mapped.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_single_mapped.h> +#include <gnuradio/buffer_type.h> #include <gnuradio/integer_math.h> #include <gnuradio/math.h> #include <boost/format.hpp> @@ -23,7 +27,6 @@ namespace gr { static long s_buffer_count = 0; // counts for debugging storage mgmt -static long s_buffer_reader_count = 0; /* ---------------------------------------------------------------------------- Notes on storage management @@ -52,117 +55,81 @@ static long s_buffer_reader_count = 0; gr::buffer_reader goes to zero, we can successfully reclaim it. ---------------------------------------------------------------------------- */ -/* - * Compute the minimum number of buffer items that work (i.e., - * address space wrap-around works). To work is to satisfy this - * constraint for integer buffer_size and k: - * - * type_size * nitems == k * page_size - */ -static inline long minimum_buffer_items(long type_size, long page_size) -{ - return page_size / GR_GCD(type_size, page_size); -} - -buffer::buffer(int nitems, size_t sizeof_item, block_sptr link) +buffer::buffer(BufferMappingType buf_type, + int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link) : d_base(0), d_bufsize(0), + d_buf_map_type(buf_type), d_max_reader_delay(0), + d_max_reader_history(1), + d_has_history(false), d_sizeof_item(sizeof_item), d_link(link), d_write_index(0), d_abs_write_offset(0), d_done(false), - d_last_min_items_read(0) + d_last_min_items_read(0), + d_callback_flag(false), + d_active_pointer_counter(0), + d_downstream_lcm_nitems(downstream_lcm_nitems), + d_write_multiple(0) { gr::configure_default_loggers(d_logger, d_debug_logger, "buffer"); - if (!allocate_buffer(nitems, sizeof_item)) - throw std::bad_alloc(); s_buffer_count++; } -buffer_sptr make_buffer(int nitems, size_t sizeof_item, block_sptr link) +buffer_sptr make_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link, + block_sptr buf_owner) { - return buffer_sptr(new buffer(nitems, sizeof_item, link)); -} +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + gr::logger_ptr logger; + gr::logger_ptr debug_logger; + gr::configure_default_loggers(logger, debug_logger, "make_buffer"); + std::ostringstream msg; +#endif -buffer::~buffer() -{ - assert(d_readers.size() == 0); - s_buffer_count--; -} +#if DEBUG_SINGLE_MAPPED + if (1) { +#else + if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) { +#endif + // Buffer type is NOT the default non custom variety so allocate a + // buffer_single_mapped instance +#ifdef BUFFER_DEBUG + msg << "buffer_single_mapped nitems: " << nitems + << " -- sizeof_item: " << sizeof_item; + GR_LOG_DEBUG(logger, msg.str()); +#endif -/*! - * sets d_vmcircbuf, d_base, d_bufsize. - * returns true iff successful. - */ -bool buffer::allocate_buffer(int nitems, size_t sizeof_item) -{ - int orig_nitems = nitems; - - // Any buffersize we come up with must be a multiple of min_nitems. - int granularity = gr::vmcircbuf_sysconfig::granularity(); - int min_nitems = minimum_buffer_items(sizeof_item, granularity); - - // Round-up nitems to a multiple of min_nitems. - if (nitems % min_nitems != 0) - nitems = ((nitems / min_nitems) + 1) * min_nitems; - - // If we rounded-up a whole bunch, give the user a heads up. - // This only happens if sizeof_item is not a power of two. - - if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) { - auto msg = - str(boost::format( - "allocate_buffer: tried to allocate" - " %d items of size %d. Due to alignment requirements" - " %d were allocated. If this isn't OK, consider padding" - " your structure to a power-of-two bytes." - " On this platform, our allocation granularity is %d bytes.") % - orig_nitems % sizeof_item % nitems % granularity); - GR_LOG_WARN(d_logger, msg.c_str()); - } + return buffer_sptr(new buffer_single_mapped( + nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner)); - d_bufsize = nitems; - d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item)); - if (d_vmcircbuf == 0) { - std::ostringstream msg; - msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size " - << d_bufsize * d_sizeof_item / 1024 << " KB"; - GR_LOG_ERROR(d_logger, msg.str()); - return false; - } + } else { + // Default to allocating a buffer_double_mapped instance +#ifdef BUFFER_DEBUG + msg << "buffer_double_mapped nitems: " << nitems + << " -- sizeof_item: " << sizeof_item; + GR_LOG_DEBUG(logger, msg.str()); +#endif - d_base = (char*)d_vmcircbuf->pointer_to_first_copy(); - return true; + return buffer_sptr( + new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); + } } -int buffer::space_available() +buffer::~buffer() { - if (d_readers.empty()) - return d_bufsize - 1; // See comment below - - else { - // Find out the maximum amount of data available to our readers - - int most_data = d_readers[0]->items_available(); - uint64_t min_items_read = d_readers[0]->nitems_read(); - for (size_t i = 1; i < d_readers.size(); i++) { - most_data = std::max(most_data, d_readers[i]->items_available()); - min_items_read = std::min(min_items_read, d_readers[i]->nitems_read()); - } - - if (min_items_read != d_last_min_items_read) { - prune_tags(d_last_min_items_read); - d_last_min_items_read = min_items_read; - } - - // The -1 ensures that the case d_write_index == d_read_index is - // unambiguous. It indicates that there is no data for the reader - return d_bufsize - most_data - 1; - } + assert(d_readers.size() == 0); + s_buffer_count--; } void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; } @@ -170,8 +137,22 @@ void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; } void buffer::update_write_pointer(int nitems) { gr::thread::scoped_lock guard(*mutex()); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + unsigned orig_wr_idx = d_write_index; +#endif + d_write_index = index_add(d_write_index, nitems); d_abs_write_offset += nitems; + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx + << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif } void buffer::set_done(bool done) @@ -180,20 +161,6 @@ void buffer::set_done(bool done) d_done = done; } -buffer_reader_sptr -buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay) -{ - if (nzero_preload < 0) - throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0"); - - buffer_reader_sptr r( - new buffer_reader(buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); - r->declare_sample_delay(delay); - buf->d_readers.push_back(r.get()); - - return r; -} - void buffer::drop_reader(buffer_reader* reader) { std::vector<buffer_reader*>::iterator result = @@ -263,92 +230,40 @@ void buffer::prune_tags(uint64_t max_time) } } -long buffer_ncurrently_allocated() { return s_buffer_count; } - -// ---------------------------------------------------------------------------- - -buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link) - : d_buffer(buffer), - d_read_index(read_index), - d_abs_read_offset(0), - d_link(link), - d_attr_delay(0) -{ - s_buffer_reader_count++; -} - -buffer_reader::~buffer_reader() +void buffer::on_lock(gr::thread::scoped_lock& lock) { - d_buffer->drop_reader(this); - s_buffer_reader_count--; + // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object + + // Wait until no other callback is active and no pointers are active for + // the buffer, then mark the callback flag active. + d_cv.wait(lock, [this]() { + return (d_callback_flag == false && d_active_pointer_counter == 0); + }); + d_callback_flag = true; } -void buffer_reader::declare_sample_delay(unsigned delay) +void buffer::on_unlock() { - d_attr_delay = delay; - d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay); -} - -unsigned buffer_reader::sample_delay() const { return d_attr_delay; } - -int buffer_reader::items_available() const -{ - return d_buffer->index_sub(d_buffer->d_write_index, d_read_index); -} + // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object -const void* buffer_reader::read_pointer() -{ - return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item]; + // Mark the callback flag inactive and notify anyone waiting + d_callback_flag = false; + d_cv.notify_all(); } -void buffer_reader::update_read_pointer(int nitems) -{ - gr::thread::scoped_lock guard(*mutex()); - d_read_index = d_buffer->index_add(d_read_index, nitems); - d_abs_read_offset += nitems; -} +long buffer_ncurrently_allocated() { return s_buffer_count; } -void buffer_reader::get_tags_in_range(std::vector<tag_t>& v, - uint64_t abs_start, - uint64_t abs_end, - long id) +std::ostream& operator<<(std::ostream& os, const buffer& buf) { - gr::thread::scoped_lock guard(*mutex()); - - uint64_t lower_bound = abs_start - d_attr_delay; - // check for underflow and if so saturate at 0 - if (lower_bound > abs_start) - lower_bound = 0; - uint64_t upper_bound = abs_end - d_attr_delay; - // check for underflow and if so saturate at 0 - if (upper_bound > abs_end) - upper_bound = 0; - - v.clear(); - std::multimap<uint64_t, tag_t>::iterator itr = - d_buffer->get_tags_lower_bound(lower_bound); - std::multimap<uint64_t, tag_t>::iterator itr_end = - d_buffer->get_tags_upper_bound(upper_bound); - - uint64_t item_time; - while (itr != itr_end) { - item_time = (*itr).second.offset + d_attr_delay; - if ((item_time >= abs_start) && (item_time < abs_end)) { - std::vector<long>::iterator id_itr; - id_itr = std::find( - itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id); - // If id is not in the vector of marked blocks - if (id_itr == itr->second.marked_deleted.end()) { - tag_t t = (*itr).second; - t.offset += d_attr_delay; - v.push_back(t); - v.back().marked_deleted.clear(); - } - } - itr++; + os << std::endl + << " sz: " << buf.d_bufsize << std::endl + << " nrdrs: " << buf.d_readers.size() << std::endl; + for (auto& rdr : buf.d_readers) { + os << " rd_idx: " << rdr->get_read_index() << std::endl + << " abs_rd_offset: " << rdr->get_abs_read_offset() << std::endl + << std::endl; } + return os; } -long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; } - } /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_double_mapped.cc b/gnuradio-runtime/lib/buffer_double_mapped.cc new file mode 100644 index 0000000000..ad99c2162b --- /dev/null +++ b/gnuradio-runtime/lib/buffer_double_mapped.cc @@ -0,0 +1,155 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include "vmcircbuf.h" +#include <gnuradio/block.h> +#include <gnuradio/buffer_double_mapped.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/integer_math.h> +#include <gnuradio/math.h> +#include <assert.h> +#include <algorithm> +#include <iostream> +#include <stdexcept> + +namespace gr { + +/* + * Compute the minimum number of buffer items that work (i.e., + * address space wrap-around works). To work is to satisfy this + * constraint for integer buffer_size and k: + * + * type_size * nitems == k * page_size + */ +static inline long minimum_buffer_items(long type_size, long page_size) +{ + return page_size / GR_GCD(type_size, page_size); +} + + +buffer_double_mapped::buffer_double_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link) + : buffer(BufferMappingType::DoubleMapped, + nitems, + sizeof_item, + downstream_lcm_nitems, + link) +{ + gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_double_mapped"); + if (!allocate_buffer(nitems, sizeof_item)) + throw std::bad_alloc(); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + { + std::ostringstream msg; + msg << "[" << this << "] " + << "buffer_double_mapped constructor -- history: " << link->history(); + GR_LOG_DEBUG(d_logger, msg.str()); + } +#endif +} + +buffer_sptr make_buffer_double_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link) +{ + return buffer_sptr( + new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); +} + +buffer_double_mapped::~buffer_double_mapped() {} + +/*! + * sets d_vmcircbuf, d_base, d_bufsize. + * returns true iff successful. + */ +bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item) +{ + int orig_nitems = nitems; + + // Any buffer size we come up with must be a multiple of min_nitems. + int granularity = gr::vmcircbuf_sysconfig::granularity(); + int min_nitems = minimum_buffer_items(sizeof_item, granularity); + + // Round-up nitems to a multiple of min_nitems. + if (nitems % min_nitems != 0) + nitems = ((nitems / min_nitems) + 1) * min_nitems; + + // If we rounded-up a whole bunch, give the user a heads up. + // This only happens if sizeof_item is not a power of two. + if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) { + auto msg = + str(boost::format( + "allocate_buffer: tried to allocate" + " %d items of size %d. Due to alignment requirements" + " %d were allocated. If this isn't OK, consider padding" + " your structure to a power-of-two bytes." + " On this platform, our allocation granularity is %d bytes.") % + orig_nitems % sizeof_item % nitems % granularity); + GR_LOG_WARN(d_logger, msg.c_str()); + } + + d_bufsize = nitems; + d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item)); + if (d_vmcircbuf == 0) { + std::ostringstream msg; + msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size " + << d_bufsize * d_sizeof_item / 1024 << " KB"; + GR_LOG_ERROR(d_logger, msg.str()); + return false; + } + + d_base = (char*)d_vmcircbuf->pointer_to_first_copy(); + return true; +} + +int buffer_double_mapped::space_available() +{ + if (d_readers.empty()) + return d_bufsize - 1; // See comment below + + else { + + // Find out the maximum amount of data available to our readers + int most_data = d_readers[0]->items_available(); + uint64_t min_items_read = d_readers[0]->nitems_read(); + for (size_t i = 1; i < d_readers.size(); i++) { + most_data = std::max(most_data, d_readers[i]->items_available()); + min_items_read = std::min(min_items_read, d_readers[i]->nitems_read()); + } + + if (min_items_read != d_last_min_items_read) { + prune_tags(d_last_min_items_read); + d_last_min_items_read = min_items_read; + } + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "space_available() called d_write_index: " << d_write_index + << " -- space_available: " << (d_bufsize - most_data - 1); + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // The -1 ensures that the case d_write_index == d_read_index is + // unambiguous. It indicates that there is no data for the reader + return d_bufsize - most_data - 1; + } +} + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_reader.cc b/gnuradio-runtime/lib/buffer_reader.cc new file mode 100644 index 0000000000..7ead53032e --- /dev/null +++ b/gnuradio-runtime/lib/buffer_reader.cc @@ -0,0 +1,180 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <gnuradio/block.h> +#include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_reader_sm.h> +#include <gnuradio/integer_math.h> +#include <gnuradio/math.h> +#include <assert.h> +#include <algorithm> +#include <iostream> +#include <stdexcept> + +namespace gr { + +static long s_buffer_reader_count = 0; + +buffer_reader_sptr +buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay) +{ + if (nzero_preload < 0) + throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0"); + + buffer_reader_sptr r; + + if (buf->get_mapping_type() == BufferMappingType::DoubleMapped) { + r.reset(new buffer_reader( + buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); + r->declare_sample_delay(delay); + } else if (buf->get_mapping_type() == BufferMappingType::SingleMapped) { + r.reset(new buffer_reader_sm( + buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); + r->declare_sample_delay(delay); + + // Update reader block history + buf->update_reader_block_history(link->history(), delay); + r->d_read_index = buf->d_write_index - nzero_preload; + } + + buf->d_readers.push_back(r.get()); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::cerr << " [" << buf.get() << ";" << r.get() + << "] buffer_add_reader() nzero_preload " << nzero_preload + << " -- delay: " << delay << " -- history: " << link->history() + << " -- RD_idx: " << r->d_read_index << std::endl; +#endif + + return r; +} + +buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link) + : d_buffer(buffer), + d_read_index(read_index), + d_abs_read_offset(0), + d_link(link), + d_attr_delay(0) +{ +#ifdef BUFFER_DEBUG + gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_reader"); +#endif + + s_buffer_reader_count++; +} + +buffer_reader::~buffer_reader() +{ + d_buffer->drop_reader(this); + s_buffer_reader_count--; +} + +void buffer_reader::declare_sample_delay(unsigned delay) +{ + d_attr_delay = delay; + d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay); +} + +unsigned buffer_reader::sample_delay() const { return d_attr_delay; } + +int buffer_reader::items_available() // const +{ + int available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << d_buffer << ";" << this << "] " + << "items_available() WR_idx: " << d_buffer->d_write_index + << " -- WR items: " << d_buffer->nitems_written() + << " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-" + << d_attr_delay << ") -- available: " << available; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + return available; +} + +const void* buffer_reader::read_pointer() +{ + return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item]; +} + +void buffer_reader::update_read_pointer(int nitems) +{ + gr::thread::scoped_lock guard(*mutex()); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + unsigned orig_rd_idx = d_read_index; +#endif + + d_read_index = d_buffer->index_add(d_read_index, nitems); + d_abs_read_offset += nitems; + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << d_buffer << ";" << this + << "] update_read_pointer -- orig d_read_index: " << orig_rd_idx + << " -- nitems: " << nitems << " -- d_read_index: " << d_read_index; + GR_LOG_DEBUG(d_buffer->d_logger, msg.str()); +#endif +} + +void buffer_reader::get_tags_in_range(std::vector<tag_t>& v, + uint64_t abs_start, + uint64_t abs_end, + long id) +{ + gr::thread::scoped_lock guard(*mutex()); + + uint64_t lower_bound = abs_start - d_attr_delay; + // check for underflow and if so saturate at 0 + if (lower_bound > abs_start) + lower_bound = 0; + uint64_t upper_bound = abs_end - d_attr_delay; + // check for underflow and if so saturate at 0 + if (upper_bound > abs_end) + upper_bound = 0; + + v.clear(); + std::multimap<uint64_t, tag_t>::iterator itr = + d_buffer->get_tags_lower_bound(lower_bound); + std::multimap<uint64_t, tag_t>::iterator itr_end = + d_buffer->get_tags_upper_bound(upper_bound); + + uint64_t item_time; + while (itr != itr_end) { + item_time = (*itr).second.offset + d_attr_delay; + if ((item_time >= abs_start) && (item_time < abs_end)) { + std::vector<long>::iterator id_itr; + id_itr = std::find( + itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id); + // If id is not in the vector of marked blocks + if (id_itr == itr->second.marked_deleted.end()) { + tag_t t = (*itr).second; + t.offset += d_attr_delay; + v.push_back(t); + v.back().marked_deleted.clear(); + } + } + itr++; + } +} + +long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; } + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc new file mode 100644 index 0000000000..9e0fac584f --- /dev/null +++ b/gnuradio-runtime/lib/buffer_reader_sm.cc @@ -0,0 +1,149 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <gnuradio/block.h> +#include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader_sm.h> +#include <gnuradio/integer_math.h> +#include <gnuradio/math.h> +#include <assert.h> +#include <algorithm> +#include <iostream> +#include <limits> +#include <stdexcept> + +namespace gr { + +buffer_reader_sm::~buffer_reader_sm() {} + +int buffer_reader_sm::items_available() +{ + int available = 0; + + if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) { + if (d_buffer->d_write_index == d_read_index) { + if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) { + available = d_buffer->d_bufsize - d_read_index; + } + } else { + available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index); + } + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << d_buffer << ";" << this << "] " + << "items_available() WR_idx: " << d_buffer->d_write_index + << " -- WR items: " << d_buffer->nitems_written() + << " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-" + << d_attr_delay << ") -- available: " << available; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + return available; +} + +bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const +{ + gr::thread::scoped_lock(*d_buffer->mutex()); + + return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) && + (d_buffer->d_write_index < d_read_index)); +} + +bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail) +{ + // Maybe adjust read pointers from min read index? + // This would mean that *all* readers must be > (passed) the write index + if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) && + (d_buffer->d_write_index < d_read_index)) { + + // Update items available before going farther as it could be stale + items_avail = items_available(); + + // Find reader with the smallest read index that is greater than the + // write index + uint32_t min_reader_index = std::numeric_limits<uint32_t>::max(); + uint32_t min_read_idx = std::numeric_limits<uint32_t>::max(); + for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { + if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) { + // Record index of reader with minimum read-index + if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_buffer->d_readers[idx]->d_read_index; + min_reader_index = idx; + } + } + } + + // Note items_avail might be zero, that's okay. + items_avail += d_read_index - min_read_idx; + int gap = min_read_idx - d_buffer->d_write_index; + if (items_avail > gap) { + return false; + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << d_buffer << ";" << this << "] " + << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index + << " -- WR items: " << d_buffer->nitems_written() + << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx; + for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { + if (idx != min_reader_index) { + msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index; + } + } + + msg << " -- GAP: " << gap << " -- items_required: " << items_required + << " -- items_avail: " << items_avail; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Shift existing data down to make room for blocked data at end of buffer + uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item; + char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item); + std::memmove(dest, d_buffer->d_base, move_data_size); + + // Next copy the data from the end of the buffer back to the beginning + uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item; + char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item); + std::memcpy(d_buffer->d_base, src, avail_data_size); + + // Now adjust write pointer + d_buffer->d_write_index += items_avail; + + // Finally adjust all reader pointers + for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { + if (idx == min_reader_index) { + d_buffer->d_readers[idx]->d_read_index = 0; + } else { + d_buffer->d_readers[idx]->d_read_index += items_avail; + d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize; + } + } + + return true; + } + + return false; +} + +buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer, + unsigned int read_index, + block_sptr link) + : buffer_reader(buffer, read_index, link) +{ +} + + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc new file mode 100644 index 0000000000..6024396557 --- /dev/null +++ b/gnuradio-runtime/lib/buffer_single_mapped.cc @@ -0,0 +1,297 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif +#include <gnuradio/block.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_single_mapped.h> +#include <gnuradio/integer_math.h> +#include <gnuradio/math.h> +#include <gnuradio/thread/thread.h> +#include <assert.h> +#include <algorithm> +#include <iostream> +#include <stdexcept> + +namespace gr { + +buffer_single_mapped::buffer_single_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + block_sptr link, + block_sptr buf_owner) + : buffer(BufferMappingType::SingleMapped, + nitems, + sizeof_item, + downstream_lcm_nitems, + link), + d_buf_owner(buf_owner), + d_buffer(nullptr, + std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1)) +{ + gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped"); + if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems)) + throw std::bad_alloc(); + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + { + std::ostringstream msg; + msg << "[" << this << "] " + << "buffer_single_mapped constructor -- history: " << link->history(); + GR_LOG_DEBUG(d_logger, msg.str()); + } +#endif +} + +buffer_single_mapped::~buffer_single_mapped() {} + +/*! + * Allocates underlying buffer. + * returns true iff successful. + */ +bool buffer_single_mapped::allocate_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems) +{ +#ifdef BUFFER_DEBUG + int orig_nitems = nitems; +#endif + + // Unlike the double mapped buffer case that can easily wrap back onto itself + // for both reads and writes the single mapped case needs to be aware of read + // and write granularity and size the underlying buffer accordingly. Otherwise + // the calls to space_available() and items_available() may return values that + // are too small and the scheduler will get stuck. + uint64_t write_granularity = 1; + + if (link()->fixed_rate()) { + // Fixed rate + int num_inputs = + link()->fixed_rate_noutput_to_ninput(1) - (link()->history() - 1); + write_granularity = + link()->fixed_rate_ninput_to_noutput(num_inputs + (link()->history() - 1)); + } + + if (link()->relative_rate() != 1.0) { + // Some blocks say they have fixed rate but actually have a relative + // rate set (looking at you puncture_bb...) so make this a separate + // check. + + // Relative rate + write_granularity = link()->relative_rate_i(); + } + + // If the output multiple has been set explicitly then adjust the write + // granularity. + if (link()->output_multiple_set()) { + write_granularity = + GR_LCM(write_granularity, (uint64_t)link()->output_multiple()); + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "WRITE GRANULARITY: " << write_granularity; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Adjust size so output buffer size is a multiple of the write granularity + if (write_granularity != 1 || downstream_lcm_nitems != 1) { + uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems); + uint64_t remainder = nitems % size_align_adjust; + nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0; + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "allocate_buffer()** called nitems: " << orig_nitems + << " -- read_multiple: " << downstream_lcm_nitems + << " -- write_multiple: " << write_granularity + << " -- NEW nitems: " << nitems; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + } + + // Allocate a new custom buffer from the owning block + char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item); + assert(buf != nullptr); + d_buffer.reset(buf); + + d_base = d_buffer.get(); + d_bufsize = nitems; + + d_downstream_lcm_nitems = downstream_lcm_nitems; + d_write_multiple = write_granularity; + + return true; +} + +bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple) +{ + uint32_t space_avail = 0; + { + gr::thread::scoped_lock(*this->mutex()); + space_avail = space_available(); + } + return ((space_avail > 0) && + ((space_avail / output_multiple) * output_multiple == 0)); +} + + +bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force) +{ + uint32_t space_avail = space_available(); + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback()*** WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() + << " -- output_multiple: " << output_multiple + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) || + force) { + // Find reader with the smallest read index + uint32_t min_read_idx = d_readers[0]->d_read_index; + for (size_t idx = 1; idx < d_readers.size(); ++idx) { + // Record index of reader with minimum read-index + if (d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_readers[idx]->d_read_index; + } + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx + << " -- shortcircuit: " + << ((min_read_idx == 0) || (min_read_idx >= d_write_index)) + << " -- to_move_items: " << (d_write_index - min_read_idx) + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Make sure we have enough room to start writing back at the beginning + if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) { + return false; + } + + // Determine how much "to be read" data needs to be moved + int to_move_items = d_write_index - min_read_idx; + assert(to_move_items > 0); + uint32_t to_move_bytes = to_move_items * d_sizeof_item; + + // Shift "to be read" data back to the beginning of the buffer + std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes); + + // Adjust write index and each reader index + d_write_index -= min_read_idx; + + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + d_readers[idx]->d_read_index -= min_read_idx; + } + + return true; + } + + return false; +} + +int buffer_single_mapped::space_available() +{ + if (d_readers.empty()) + return d_bufsize; + + else { + + size_t min_items_read_idx = 0; + uint64_t min_items_read = d_readers[0]->nitems_read(); + for (size_t idx = 1; idx < d_readers.size(); ++idx) { + // Record index of reader with minimum nitems read + if (d_readers[idx]->nitems_read() < + d_readers[min_items_read_idx]->nitems_read()) { + min_items_read_idx = idx; + } + min_items_read = std::min(min_items_read, d_readers[idx]->nitems_read()); + } + + buffer_reader* min_idx_reader = d_readers[min_items_read_idx]; + unsigned min_read_index = d_readers[min_items_read_idx]->d_read_index; + + // For single mapped buffer there is no wrapping beyond the end of the + // buffer +#ifdef BUFFER_DEBUG + int thecase = 4; // REMOVE ME - just for debug +#endif + int space = d_bufsize - d_write_index; + + if (min_read_index == d_write_index) { +#ifdef BUFFER_DEBUG + thecase = 1; +#endif + + // If the (min) read index and write index are equal then the buffer + // is either completely empty or completely full depending on if + // the number of items read matches the number written + size_t offset = ((min_idx_reader->link()->history() - 1) + + min_idx_reader->sample_delay()); + if ((min_idx_reader->nitems_read() - offset) != nitems_written()) { +#ifdef BUFFER_DEBUG + thecase = 2; +#endif + space = 0; + } + } else if (min_read_index > d_write_index) { +#ifdef BUFFER_DEBUG + thecase = 3; +#endif + space = min_read_index - d_write_index; + // Leave extra space in case the reader gets stuck and needs realignment + { + if ((d_write_index > (d_bufsize / 2)) || + (min_read_index < (d_bufsize / 2))) { +#ifdef BUFFER_DEBUG + thecase = 17; +#endif + space = 0; + } else { + space = (d_bufsize / 2) - d_write_index; + } + } + } + + if (min_items_read != d_last_min_items_read) { + prune_tags(d_last_min_items_read); + d_last_min_items_read = min_items_read; + } + +#ifdef BUFFER_DEBUG + // BUFFER DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "space_available() called (case: " << thecase + << ") d_write_index: " << d_write_index << " (" << nitems_written() << ") " + << " -- min_read_index: " << min_read_index << " (" + << min_idx_reader->nitems_read() << ") " + << " -- space: " << space + << " (sample delay: " << min_idx_reader->sample_delay() << ")"; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + return space; + } +} + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_type.cc b/gnuradio-runtime/lib/buffer_type.cc new file mode 100644 index 0000000000..b667eded1e --- /dev/null +++ b/gnuradio-runtime/lib/buffer_type.cc @@ -0,0 +1,18 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ +#include <gnuradio/buffer_type.h> + +namespace gr { + +uint32_t buffer_type_base::s_nextId = 0; +std::mutex buffer_type_base::s_mutex; + + +} /* namespace gr */
\ No newline at end of file diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc index 0df75553e0..b9986f8370 100644 --- a/gnuradio-runtime/lib/flat_flowgraph.cc +++ b/gnuradio-runtime/lib/flat_flowgraph.cc @@ -15,6 +15,9 @@ #include "flat_flowgraph.h" #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/buffer_type.h> +#include <gnuradio/integer_math.h> #include <gnuradio/logger.h> #include <gnuradio/prefs.h> #include <volk/volk.h> @@ -24,6 +27,7 @@ namespace gr { + // 32Kbyte buffer size between blocks #define GR_FIXED_BUFFER_SIZE (32 * (1L << 10)) @@ -44,8 +48,9 @@ void flat_flowgraph::setup_connections() basic_block_vector_t blocks = calc_used_blocks(); // Assign block details to blocks - for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) - cast_to_block_sptr(*p)->set_detail(allocate_block_detail(*p)); + for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { + allocate_block_detail(*p); + } // Connect inputs to outputs for each block for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { @@ -56,7 +61,7 @@ void flat_flowgraph::setup_connections() block->set_is_unaligned(false); } - // Connect message ports connetions + // Connect message ports connections for (msg_edge_viter_t i = d_msg_edges.begin(); i != d_msg_edges.end(); i++) { GR_LOG_DEBUG( d_debug_logger, @@ -67,11 +72,10 @@ void flat_flowgraph::setup_connections() } } -block_detail_sptr flat_flowgraph::allocate_block_detail(basic_block_sptr block) +void flat_flowgraph::allocate_block_detail(basic_block_sptr block) { int ninputs = calc_used_ports(block, true).size(); int noutputs = calc_used_ports(block, false).size(); - block_detail_sptr detail = make_block_detail(ninputs, noutputs); block_sptr grblock = cast_to_block_sptr(block); if (!grblock) @@ -80,98 +84,88 @@ block_detail_sptr flat_flowgraph::allocate_block_detail(basic_block_sptr block) block->alias()) .str()); - GR_LOG_DEBUG(d_debug_logger, "Creating block detail for " + block->identifier()); + // Determine the downstream max per output port + std::vector<int> downstream_max_nitems(noutputs, 0); + std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1); +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "BLOCK: " << block->identifier(); + GR_LOG_DEBUG(d_logger, msg.str()); // could also be d_debug_logger +#endif for (int i = 0; i < noutputs; i++) { - grblock->expand_minmax_buffer(i); + int nitems = 0; + uint64_t lcm_nitems = 1; + basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i); + for (basic_block_viter_t blk = downstream_blocks.begin(); + blk != downstream_blocks.end(); + blk++) { + block_sptr dgrblock = cast_to_block_sptr(*blk); + if (!dgrblock) + throw std::runtime_error("allocate_buffer found non-gr::block"); + +#ifdef BUFFER_DEBUG + msg.str(""); + msg << " DWNSTRM BLOCK: " << dgrblock->identifier(); + GR_LOG_DEBUG(d_logger, msg.str()); +#endif - buffer_sptr buffer = allocate_buffer(block, i); - GR_LOG_DEBUG(d_debug_logger, - "Allocated buffer for output " + block->identifier() + " " + - std::to_string(i)); - detail->set_output(i, buffer); - - // Update the block's max_output_buffer based on what was actually allocated. - if ((grblock->max_output_buffer(i) != buffer->bufsize()) && - (grblock->max_output_buffer(i) != -1)) - GR_LOG_WARN(d_logger, - boost::format("Block (%1%) max output buffer set to %2%" - " instead of requested %3%") % - grblock->alias() % buffer->bufsize() % - grblock->max_output_buffer(i)); - grblock->set_max_output_buffer(i, buffer->bufsize()); - } + // If any downstream blocks are decimators and/or have a large + // output_multiple, ensure we have a buffer at least twice their + // decimation factor*output_multiple + double decimation = (1.0 / dgrblock->relative_rate()); + int multiple = dgrblock->output_multiple(); + int history = dgrblock->history(); + nitems = + std::max(nitems, static_cast<int>(2 * (decimation * multiple + history))); + + // Calculate the LCM of downstream reader nitems +#ifdef BUFFER_DEBUG + msg.str(""); + msg << " OUT MULTIPLE: " << multiple; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif - return detail; -} + if (dgrblock->fixed_rate()) { + lcm_nitems = GR_LCM(lcm_nitems, + (uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) - + (dgrblock->history() - 1))); + } + if (dgrblock->relative_rate() != 1.0) { + // Relative rate + lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d()); + } -buffer_sptr flat_flowgraph::allocate_buffer(basic_block_sptr block, int port) -{ - block_sptr grblock = cast_to_block_sptr(block); - if (!grblock) - throw std::runtime_error("allocate_buffer found non-gr::block"); - int item_size = block->output_signature()->sizeof_stream_item(port); - - // *2 because we're now only filling them 1/2 way in order to - // increase the available parallelism when using the TPB scheduler. - // (We're double buffering, where we used to single buffer) - int nitems = s_fixed_buffer_size * 2 / item_size; - - // Make sure there are at least twice the output_multiple no. of items - if (nitems < 2 * grblock->output_multiple()) // Note: this means output_multiple() - nitems = 2 * grblock->output_multiple(); // can't be changed by block dynamically - - // If any downstream blocks are decimators and/or have a large output_multiple, - // ensure we have a buffer at least twice their decimation factor*output_multiple - basic_block_vector_t blocks = calc_downstream_blocks(block, port); - - // limit buffer size if indicated - if (grblock->max_output_buffer(port) > 0) { - // GR_LOG_INFO(d_debug_logger, boost::format("constraining output items to %d") - // % block->max_output_buffer(port)); - nitems = std::min((long)nitems, (long)grblock->max_output_buffer(port)); - nitems -= nitems % grblock->output_multiple(); - if (nitems < 1) - throw std::runtime_error("problems allocating a buffer with the given max " - "output buffer constraint!"); - } else if (grblock->min_output_buffer(port) > 0) { - nitems = std::max((long)nitems, (long)grblock->min_output_buffer(port)); - nitems -= nitems % grblock->output_multiple(); - if (nitems < 1) - throw std::runtime_error("problems allocating a buffer with the given min " - "output buffer constraint!"); - } + // Sanity check, make sure lcm_nitems is at least 1 + if (lcm_nitems < 1) { + lcm_nitems = 1; + } - for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) { - block_sptr dgrblock = cast_to_block_sptr(*p); - if (!dgrblock) - throw std::runtime_error("allocate_buffer found non-gr::block"); - - double decimation = (1.0 / dgrblock->relative_rate()); - int multiple = dgrblock->output_multiple(); - int history = dgrblock->history(); - nitems = - std::max(nitems, static_cast<int>(2 * (decimation * multiple + history))); - } +#ifdef BUFFER_DEBUG + msg.str(""); + msg << " NINPUT_ITEMS: " << nitems; + GR_LOG_DEBUG(d_logger, msg.str()); - // std::cout << "make_buffer(" << nitems << ", " << item_size << ", " << grblock << - // "\n"; - // We're going to let this fail once and retry. If that fails, - // throw and exit. - buffer_sptr b; - try { - b = make_buffer(nitems, item_size, grblock); - } catch (std::bad_alloc&) { - b = make_buffer(nitems, item_size, grblock); - } + msg.str(""); + msg << " LCM NITEMS: " << lcm_nitems; + GR_LOG_DEBUG(d_logger, msg.str()); - // Set the max noutput items size here to make sure it's always - // set in the block and available in the start() method. - // But don't overwrite if the user has set this externally. - if (!grblock->is_set_max_noutput_items()) - grblock->set_max_noutput_items(nitems); + msg.str(""); + msg << " HISTORY: " << dgrblock->history(); + GR_LOG_DEBUG(d_logger, msg.str()); - return b; + msg.str(""); + msg << " DELAY: " << dgrblock->sample_delay(0); + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + } + downstream_max_nitems[i] = nitems; + downstream_lcm_nitems[i] = lcm_nitems; + } + + // Allocate the block detail and necessary buffers + grblock->allocate_detail( + ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems); } void flat_flowgraph::connect_block_inputs(basic_block_sptr block) @@ -194,8 +188,40 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block) block_sptr src_grblock = cast_to_block_sptr(src_block); if (!src_grblock) throw std::runtime_error("connect_block_inputs found non-gr::block"); - buffer_sptr src_buffer = src_grblock->detail()->output(src_port); + buffer_sptr src_buffer; + buffer_type_t src_buf_type = src_grblock->get_buffer_type(); + buffer_type_t dest_buf_type = grblock->get_buffer_type(); + if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() || + dest_buf_type == src_buf_type) { + // The block is not using a custom buffer OR the block and the upstream + // block both use the same kind of custom buffer + src_buffer = src_grblock->detail()->output(src_port); + } else { + if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() && + src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) { + // The block uses a custom buffer but the upstream block does not + // therefore the upstream block's buffer can be replaced with the + // type of buffer that the block needs + std::ostringstream msg; + msg << "Block: " << grblock->identifier() + << "replacing upstream block: " << src_grblock->identifier() + << " buffer with a custom buffer"; + GR_LOG_DEBUG(d_debug_logger, msg.str()); + src_buffer = src_grblock->replace_buffer(src_port, grblock); + } else { + // Both the block and upstream block use incompatible buffer types + // which is not currently allowed + std::ostringstream msg; + msg << "Block: " << grblock->identifier() + << " and upstream block: " << src_grblock->identifier() + << " use incompatible custom buffer types (" << dest_buf_type.name() + << " -- " << src_buf_type.name() << ") --> " + << (dest_buf_type == src_buf_type); + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + } GR_LOG_DEBUG(d_debug_logger, "Setting input " + std::to_string(dst_port) + " from edge " + @@ -220,7 +246,7 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg) if (!block->detail()) { GR_LOG_DEBUG(d_debug_logger, "merge: allocating new detail for block " + block->identifier()); - block->set_detail(allocate_block_detail(block)); + allocate_block_detail(block); } else { GR_LOG_DEBUG(d_debug_logger, "merge: reusing original detail for block " + diff --git a/gnuradio-runtime/lib/flat_flowgraph.h b/gnuradio-runtime/lib/flat_flowgraph.h index deb0fe2c6a..71645d6a25 100644 --- a/gnuradio-runtime/lib/flat_flowgraph.h +++ b/gnuradio-runtime/lib/flat_flowgraph.h @@ -78,8 +78,7 @@ public: private: flat_flowgraph(); - block_detail_sptr allocate_block_detail(basic_block_sptr block); - buffer_sptr allocate_buffer(basic_block_sptr block, int port); + void allocate_block_detail(basic_block_sptr block); void connect_block_inputs(basic_block_sptr block); /* When reusing a flowgraph's blocks, this call makes sure all of diff --git a/gnuradio-runtime/lib/qa_buffer.cc b/gnuradio-runtime/lib/qa_buffer.cc index c0e9c0d130..cefe548338 100644 --- a/gnuradio-runtime/lib/qa_buffer.cc +++ b/gnuradio-runtime/lib/qa_buffer.cc @@ -14,6 +14,8 @@ #include <gnuradio/buffer.h> +#include <gnuradio/buffer_double_mapped.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/random.h> #include <boost/test/unit_test.hpp> #include <cstdlib> @@ -40,7 +42,8 @@ static void t0_body() int nitems = 4000 / sizeof(int); int counter = 0; - gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr())); + gr::buffer_sptr buf( + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); int last_sa; int sa; @@ -74,7 +77,8 @@ static void t1_body() int write_counter = 0; int read_counter = 0; - gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr())); + gr::buffer_sptr buf( + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr())); int sa; @@ -145,7 +149,8 @@ static void t2_body() int nitems = (64 * (1L << 10)) / sizeof(int); // 64K worth of ints - gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr())); + gr::buffer_sptr buf( + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr())); int read_counter = 0; @@ -210,7 +215,8 @@ static void t3_body() int nitems = (64 * (1L << 10)) / sizeof(int); static const int N = 5; - gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr())); + gr::buffer_sptr buf( + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); gr::buffer_reader_sptr reader[N]; int read_counter[N]; int write_counter = 0; diff --git a/gnuradio-runtime/lib/tpb_detail.cc b/gnuradio-runtime/lib/tpb_detail.cc index 312a5bf5e2..badc4cf9ef 100644 --- a/gnuradio-runtime/lib/tpb_detail.cc +++ b/gnuradio-runtime/lib/tpb_detail.cc @@ -15,6 +15,7 @@ #include <gnuradio/block.h> #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/tpb_detail.h> namespace gr { diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt b/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt index 14eb501ddb..dbc74b0529 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt @@ -17,6 +17,7 @@ messages/msg_queue_python.cc block_gateway_python.cc # block_registry_python.cc buffer_python.cc + buffer_reader_python.cc constants_python.cc endianness_python.cc expj_python.cc diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc index f18fce2c9a..933d651fa7 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc @@ -13,8 +13,8 @@ /* If manual edits are made, the following tags should be modified accordingly. */ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ -/* BINDTOOL_HEADER_FILE(basic_block.h) */ -/* BINDTOOL_HEADER_FILE_HASH(9239cc3381582f5f44010485cd48fa72) */ +/* BINDTOOL_HEADER_FILE(basic_block.h) */ +/* BINDTOOL_HEADER_FILE_HASH(5c1d5b8a3666a2e0e7a6fafae07afa29) */ /***********************************************************************************/ #include <pybind11/complex.h> diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc index 219c95b153..979c19f2f2 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc @@ -13,8 +13,8 @@ /* If manual edits are made, the following tags should be modified accordingly. */ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ -/* BINDTOOL_HEADER_FILE(block_detail.h) */ -/* BINDTOOL_HEADER_FILE_HASH(274b4f673ac76cedd3ef6d466afab2fc) */ +/* BINDTOOL_HEADER_FILE(block_detail.h) */ +/* BINDTOOL_HEADER_FILE_HASH(61794baf0e727516eb0eedc08753a17a) */ /***********************************************************************************/ #include <pybind11/complex.h> @@ -25,6 +25,7 @@ namespace py = pybind11; #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> // pydoc.h is automatically generated in the build directory #include <block_detail_pydoc.h> diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc index 78c5b03cfe..9660a7f245 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc @@ -14,7 +14,7 @@ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ /* BINDTOOL_HEADER_FILE(block.h) */ -/* BINDTOOL_HEADER_FILE_HASH(5df7331a717fb7c436eac2fc20991858) */ +/* BINDTOOL_HEADER_FILE_HASH(238d129ad018daa3146ff1d8867dc356) */ /***********************************************************************************/ #include <pybind11/complex.h> diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc index 89845d2e28..de7d4edf1c 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc @@ -13,8 +13,8 @@ /* If manual edits are made, the following tags should be modified accordingly. */ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ -/* BINDTOOL_HEADER_FILE(buffer.h) */ -/* BINDTOOL_HEADER_FILE_HASH(69b17e66fac6e29f860466846a64feab) */ +/* BINDTOOL_HEADER_FILE(buffer.h) */ +/* BINDTOOL_HEADER_FILE_HASH(e5247f4fe5b5873c66eed72880194981) */ /***********************************************************************************/ #include <pybind11/complex.h> @@ -25,6 +25,7 @@ namespace py = pybind11; #include <gnuradio/block.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> // pydoc.h is automatically generated in the build directory #include <buffer_pydoc.h> diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc new file mode 100644 index 0000000000..23e2f39d10 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc @@ -0,0 +1,126 @@ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +/***********************************************************************************/ +/* This file is automatically generated using bindtool and can be manually edited */ +/* The following lines can be configured to regenerate this file during cmake */ +/* If manual edits are made, the following tags should be modified accordingly. */ +/* BINDTOOL_GEN_AUTOMATIC(0) */ +/* BINDTOOL_USE_PYGCCXML(0) */ +/* BINDTOOL_HEADER_FILE(buffer_reader.h) */ +/* BINDTOOL_HEADER_FILE_HASH(451fcbd61f40b7d17a151474869aad75) */ +/***********************************************************************************/ + +#include <pybind11/complex.h> +#include <pybind11/pybind11.h> +#include <pybind11/stl.h> + +namespace py = pybind11; + +#include <gnuradio/block.h> +#include <gnuradio/buffer_reader.h> +// pydoc.h is automatically generated in the build directory +#include <buffer_pydoc.h> + +void bind_buffer_reader(py::module& m) +{ + + using buffer_reader = ::gr::buffer_reader; + + py::class_<buffer_reader, std::shared_ptr<buffer_reader>>( + m, "buffer_reader", D(buffer_reader)) + + .def(py::init<gr::buffer_reader const&>(), + py::arg("arg0"), + D(buffer_reader, buffer_reader)) + + + .def("declare_sample_delay", + &buffer_reader::declare_sample_delay, + py::arg("delay"), + D(buffer_reader, declare_sample_delay)) + + + .def("sample_delay", &buffer_reader::sample_delay, D(buffer_reader, sample_delay)) + + + .def("items_available", + &buffer_reader::items_available, + D(buffer_reader, items_available)) + + + .def("buffer", &buffer_reader::buffer, D(buffer_reader, buffer)) + + + .def("max_possible_items_available", + &buffer_reader::max_possible_items_available, + D(buffer_reader, max_possible_items_available)) + + + .def("read_pointer", &buffer_reader::read_pointer, D(buffer_reader, read_pointer)) + + + .def("update_read_pointer", + &buffer_reader::update_read_pointer, + py::arg("nitems"), + D(buffer_reader, update_read_pointer)) + + + .def("set_done", + &buffer_reader::set_done, + py::arg("done"), + D(buffer_reader, set_done)) + + + .def("done", &buffer_reader::done, D(buffer_reader, done)) + + + .def("mutex", &buffer_reader::mutex, D(buffer_reader, mutex)) + + + .def("nitems_read", &buffer_reader::nitems_read, D(buffer_reader, nitems_read)) + + + .def("reset_nitem_counter", + &buffer_reader::reset_nitem_counter, + D(buffer_reader, reset_nitem_counter)) + + + .def("get_sizeof_item", + &buffer_reader::get_sizeof_item, + D(buffer_reader, get_sizeof_item)) + + + .def("link", &buffer_reader::link, D(buffer_reader, link)) + + + .def("get_tags_in_range", + &buffer_reader::get_tags_in_range, + py::arg("v"), + py::arg("abs_start"), + py::arg("abs_end"), + py::arg("id"), + D(buffer_reader, get_tags_in_range)) + + ; + + + m.def("buffer_add_reader", + &::gr::buffer_add_reader, + py::arg("buf"), + py::arg("nzero_preload"), + py::arg("link") = gr::block_sptr(), + py::arg("delay") = 0, + D(buffer_add_reader)); + + + m.def("buffer_reader_ncurrently_allocated", + &::gr::buffer_reader_ncurrently_allocated, + D(buffer_reader_ncurrently_allocated)); +} diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc index 8ec43201fb..cccc60fe30 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc @@ -13,8 +13,8 @@ /* If manual edits are made, the following tags should be modified accordingly. */ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ -/* BINDTOOL_HEADER_FILE(logger.h) */ -/* BINDTOOL_HEADER_FILE_HASH(eaf28bcaeb7c34dc0fca3fdd4a9860c0) */ +/* BINDTOOL_HEADER_FILE(logger.h) */ +/* BINDTOOL_HEADER_FILE_HASH(6d4f118d476b888f79737f835ed27a4d) */ /***********************************************************************************/ #include <pybind11/complex.h> diff --git a/gnuradio-runtime/python/gnuradio/gr_unittest.py b/gnuradio-runtime/python/gnuradio/gr_unittest.py index 6177f0f32f..ebd47019a5 100644 --- a/gnuradio-runtime/python/gnuradio/gr_unittest.py +++ b/gnuradio-runtime/python/gnuradio/gr_unittest.py @@ -110,6 +110,25 @@ class TestCase(unittest.TestCase): self.assertComplexAlmostEqual2(x, y, abs_eps, rel_eps, msg) for (x, y) in zip(a, b) ]) + + + def assertSequenceEqualGR(self, data_in, data_out): + """ + Note this function exists because of this bug: https://bugs.python.org/issue19217 + Calling self.assertEqual(seqA, seqB) can hang if seqA and seqB are not equal. + """ + if len(data_in) != len(data_out): + print('Lengths do not match: {:d} -- {:d}'.format(len(data_in), len(data_out))) + self.assertTrue(len(data_in) == len(data_out)) + total_miscompares = 0 + for idx, item in enumerate(zip(data_in, data_out)): + if item[0] != item[1]: + total_miscompares += 1 + print('Miscompare at: {:d} ({:d} -- {:d})'.format(idx, item[0], item[1])) + if total_miscompares > 0: + print('Total miscompares: {:d}'.format(total_miscompares)) + self.assertTrue(total_miscompares == 0) + def waitFor( self, diff --git a/gr-audio/lib/portaudio/portaudio_sink.cc b/gr-audio/lib/portaudio/portaudio_sink.cc index a197776adc..ea27c7c0f9 100644 --- a/gr-audio/lib/portaudio/portaudio_sink.cc +++ b/gr-audio/lib/portaudio/portaudio_sink.cc @@ -65,7 +65,8 @@ void portaudio_sink::create_ringbuffer(void) (N_BUFFERS * bufsize_samples / d_output_parameters.channelCount)); // FYI, the buffer indices are in units of samples. - d_writer = gr::make_buffer(N_BUFFERS * bufsize_samples, sizeof(sample_t)); + d_writer = gr::make_buffer( + N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples); d_reader = gr::buffer_add_reader(d_writer, 0); } diff --git a/gr-audio/lib/portaudio/portaudio_sink.h b/gr-audio/lib/portaudio/portaudio_sink.h index 2e25e2f740..ae28705647 100644 --- a/gr-audio/lib/portaudio/portaudio_sink.h +++ b/gr-audio/lib/portaudio/portaudio_sink.h @@ -12,6 +12,7 @@ #include <gnuradio/audio/sink.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/logger.h> #include <gnuradio/thread/thread.h> #include <portaudio.h> diff --git a/gr-audio/lib/portaudio/portaudio_source.cc b/gr-audio/lib/portaudio/portaudio_source.cc index 86a7f16b48..63a3d5e1ba 100644 --- a/gr-audio/lib/portaudio/portaudio_source.cc +++ b/gr-audio/lib/portaudio/portaudio_source.cc @@ -64,7 +64,8 @@ void portaudio_source::create_ringbuffer(void) (N_BUFFERS * bufsize_samples / d_input_parameters.channelCount)); // FYI, the buffer indices are in units of samples. - d_writer = gr::make_buffer(N_BUFFERS * bufsize_samples, sizeof(sample_t)); + d_writer = gr::make_buffer( + N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples); d_reader = gr::buffer_add_reader(d_writer, 0); } diff --git a/gr-audio/lib/portaudio/portaudio_source.h b/gr-audio/lib/portaudio/portaudio_source.h index ecaf5a1c5b..baab6bee42 100644 --- a/gr-audio/lib/portaudio/portaudio_source.h +++ b/gr-audio/lib/portaudio/portaudio_source.h @@ -13,6 +13,7 @@ #include <gnuradio/audio/source.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/logger.h> #include <gnuradio/thread/thread.h> #include <portaudio.h> diff --git a/gr-blocks/lib/qa_gr_block.cc b/gr-blocks/lib/qa_gr_block.cc index ff88902e98..767a252ddb 100644 --- a/gr-blocks/lib/qa_gr_block.cc +++ b/gr-blocks/lib/qa_gr_block.cc @@ -15,6 +15,7 @@ #include <gnuradio/block.h> #include <gnuradio/blocks/null_sink.h> #include <gnuradio/blocks/null_source.h> +#include <gnuradio/buffer_reader.h> #include <boost/test/unit_test.hpp> BOOST_AUTO_TEST_CASE(t0) diff --git a/gr-blocks/lib/qa_gr_top_block.cc b/gr-blocks/lib/qa_gr_top_block.cc index 6a9e0f5467..5374a67220 100644 --- a/gr-blocks/lib/qa_gr_top_block.cc +++ b/gr-blocks/lib/qa_gr_top_block.cc @@ -20,7 +20,7 @@ #include <boost/test/unit_test.hpp> #include <iostream> -#define VERBOSE 0 +#define VERBOSE 1 BOOST_AUTO_TEST_CASE(t0) { @@ -250,6 +250,7 @@ BOOST_AUTO_TEST_CASE(t10_reconfig_max_output_buffer) // Reconfigure with gr_head in the middle tb->lock(); + gr::block_sptr nop = gr::blocks::nop::make(sizeof(int)); nop->set_max_output_buffer(4000); tb->disconnect(src, 0, dst, 0); diff --git a/gr-fec/python/fec/_qa_helper.py b/gr-fec/python/fec/_qa_helper.py index 8a1b37ec95..00a6015684 100644 --- a/gr-fec/python/fec/_qa_helper.py +++ b/gr-fec/python/fec/_qa_helper.py @@ -46,8 +46,8 @@ class _qa_helper(gr.top_block): self.threading = threading self.ext_encoder = extended_encoder(enc, threading=self.threading, puncpat=self.puncpat) - self.ext_decoder= extended_decoder(dec, threading=self.threading, ann=None, - puncpat=self.puncpat, integration_period=10000) + self.ext_decoder = extended_decoder(dec, threading=self.threading, ann=None, + puncpat=self.puncpat, integration_period=10000) self.src = blocks.vector_source_b(data_size*[0, 1, 2, 3, 5, 7, 9, 13, 15, 25, 31, 45, 63, 95, 127], False) self.unpack = blocks.unpack_k_bits_bb(8) diff --git a/gr-fec/python/fec/extended_encoder.py b/gr-fec/python/fec/extended_encoder.py index 70422da01e..fbcbeb1be1 100644 --- a/gr-fec/python/fec/extended_encoder.py +++ b/gr-fec/python/fec/extended_encoder.py @@ -26,8 +26,8 @@ class extended_encoder(gr.hier_block2): self.blocks=[] self.puncpat=puncpat - if(type(encoder_obj_list) == list): - if(type(encoder_obj_list[0]) == list): + if (type(encoder_obj_list) == list): + if (type(encoder_obj_list[0]) == list): gr.log.info("fec.extended_encoder: Parallelism must be 1.") raise AttributeError else: diff --git a/gr-fec/python/fec/qa_depuncture.py b/gr-fec/python/fec/qa_depuncture.py index 0fdd795c0c..763a40f242 100644 --- a/gr-fec/python/fec/qa_depuncture.py +++ b/gr-fec/python/fec/qa_depuncture.py @@ -65,7 +65,7 @@ class test_depuncture (gr_unittest.TestCase): for i in range(len(dst_data)): dst_data[i] = int(dst_data[i]) - self.assertEqual(self.expected, dst_data) + self.assertSequenceEqualGR(self.expected, dst_data) def test_001(self): # Test normal operation of the depuncture block with a delay @@ -90,7 +90,7 @@ class test_depuncture (gr_unittest.TestCase): for i in range(len(dst_data)): dst_data[i] = int(dst_data[i]) - self.assertEqual(self.expected, dst_data) + self.assertSequenceEqualGR(self.expected, dst_data) def test_002(self): # Test scenario where we have defined a puncture pattern with @@ -116,7 +116,7 @@ class test_depuncture (gr_unittest.TestCase): for i in range(len(dst_data)): dst_data[i] = int(dst_data[i]) - self.assertEqual(self.expected, dst_data) + self.assertSequenceEqualGR(self.expected, dst_data) def test_003(self): # Test scenario where we have defined a puncture pattern with @@ -150,7 +150,7 @@ class test_depuncture (gr_unittest.TestCase): for i in range(len(dst_data1)): dst_data1[i] = int(dst_data1[i]) - self.assertEqual(dst_data1, dst_data0) + self.assertSequenceEqualGR(dst_data1, dst_data0) def test_004(self): # Test normal operation of the depuncture block without @@ -176,7 +176,7 @@ class test_depuncture (gr_unittest.TestCase): for i in range(len(dst_data)): dst_data[i] = int(dst_data[i]) - self.assertEqual(self.expected, dst_data) + self.assertSequenceEqualGR(self.expected, dst_data) if __name__ == '__main__': diff --git a/gr-fec/python/fec/qa_fecapi_dummy.py b/gr-fec/python/fec/qa_fecapi_dummy.py index f1de85e526..fa1abdf789 100644 --- a/gr-fec/python/fec/qa_fecapi_dummy.py +++ b/gr-fec/python/fec/qa_fecapi_dummy.py @@ -26,7 +26,7 @@ class test_fecapi_dummy(gr_unittest.TestCase): def tearDown(self): self.tb = None - + def test_parallelism0_00(self): frame_size = 30 enc = fec.dummy_encoder_make(frame_size * 8) @@ -39,7 +39,7 @@ class test_fecapi_dummy(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism0_01(self): frame_size = 30 @@ -52,8 +52,8 @@ class test_fecapi_dummy(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - - self.assertEqual(data_in, data_out) + + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism0_02(self): frame_size = 30 @@ -66,8 +66,8 @@ class test_fecapi_dummy(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - - self.assertEqual(data_in, data_out) + + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_00(self): frame_size = 30 @@ -83,7 +83,7 @@ class test_fecapi_dummy(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_01(self): frame_size = 30 @@ -99,7 +99,7 @@ class test_fecapi_dummy(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_02(self): frame_size = 300 @@ -111,11 +111,10 @@ class test_fecapi_dummy(gr_unittest.TestCase): self.test = _qa_helper(10 * frame_size, enc, dec, threading) self.tb.connect(self.test) self.tb.run() - data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - - self.assertEqual(data_in, data_out) + + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_03(self): frame_size = 30 @@ -132,7 +131,7 @@ class test_fecapi_dummy(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_04(self): frame_size = 30 @@ -149,7 +148,7 @@ class test_fecapi_dummy(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_05(self): frame_size = 30 @@ -257,10 +256,10 @@ class test_fecapi_dummy(gr_unittest.TestCase): data = list(data) packed_data = list(packed_data) - self.assertListEqual(packed_data, r0) - self.assertListEqual(data, r1) - self.assertListEqual(packed_data, r2) - self.assertListEqual(data, r3) + self.assertSequenceEqualGR(packed_data, r0) + self.assertSequenceEqualGR(data, r1) + self.assertSequenceEqualGR(packed_data, r2) + self.assertSequenceEqualGR(data, r3) if __name__ == '__main__': diff --git a/gr-fec/python/fec/qa_fecapi_repetition.py b/gr-fec/python/fec/qa_fecapi_repetition.py index 9d560b32ef..ec55e2d83d 100644 --- a/gr-fec/python/fec/qa_fecapi_repetition.py +++ b/gr-fec/python/fec/qa_fecapi_repetition.py @@ -36,7 +36,7 @@ class test_fecapi_repetition(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism0_01(self): frame_size = 30 @@ -51,7 +51,7 @@ class test_fecapi_repetition(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism0_02(self): frame_size = 30 @@ -66,7 +66,7 @@ class test_fecapi_repetition(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_00(self): frame_size = 30 @@ -83,7 +83,7 @@ class test_fecapi_repetition(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_01(self): frame_size = 30 @@ -100,7 +100,7 @@ class test_fecapi_repetition(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_02(self): frame_size = 300 @@ -117,7 +117,7 @@ class test_fecapi_repetition(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_03(self): frame_size = 30 @@ -135,7 +135,7 @@ class test_fecapi_repetition(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) def test_parallelism1_04(self): frame_size = 30 @@ -153,7 +153,7 @@ class test_fecapi_repetition(gr_unittest.TestCase): data_in = self.test.snk_input.data() data_out = self.test.snk_output.data() - self.assertEqual(data_in, data_out) + self.assertSequenceEqualGR(data_in, data_out) if __name__ == '__main__': |