diff options
38 files changed, 1655 insertions, 497 deletions
diff --git a/gnuradio-runtime/include/gnuradio/CMakeLists.txt b/gnuradio-runtime/include/gnuradio/CMakeLists.txt index 9e22212b7b..b45b84e4fa 100644 --- a/gnuradio-runtime/include/gnuradio/CMakeLists.txt +++ b/gnuradio-runtime/include/gnuradio/CMakeLists.txt @@ -20,12 +20,14 @@ install(FILES block_gateway.h block_registry.h buffer.h + buffer_context.h buffer_double_mapped.h buffer_reader.h buffer_reader_sm.h buffer_single_mapped.h - constants.h buffer_type.h + constants.h + custom_lock.h endianness.h expj.h flowgraph.h @@ -35,6 +37,7 @@ install(FILES gr_complex.h hier_block2.h high_res_timer.h + host_buffer.h integer_math.h io_signature.h logger.h diff --git a/gnuradio-runtime/include/gnuradio/block.h b/gnuradio-runtime/include/gnuradio/block.h index ee90a35d2c..5fd1316951 100644 --- a/gnuradio-runtime/include/gnuradio/block.h +++ b/gnuradio-runtime/include/gnuradio/block.h @@ -524,7 +524,8 @@ public: void allocate_detail(int ninputs, int noutputs, const std::vector<int>& downstream_max_nitems_vec, - const std::vector<uint64_t>& downstream_lcm_nitems_vec); + const std::vector<uint64_t>& downstream_lcm_nitems_vec, + const std::vector<uint32_t>& downstream_max_out_mult_vec); // --------------- Custom buffer-related functions ------------- @@ -539,55 +540,8 @@ public: * that is incompatible with the default buffer type created by this block. * */ - buffer_sptr replace_buffer(uint32_t out_port, block_sptr block_owner); - - /*! - * \brief Return the type of custom buffer used by the block - * - * \details - * Blocks that wish to allocate custom buffers should override this function. - */ - virtual buffer_type_t get_buffer_type() - { -#if DEBUG_SINGLE_MAPPED - return buftype_CUSTOM_HOST::get(); -#else - return buftype_DEFAULT_NON_CUSTOM::get(); -#endif - } - - /*! - * \brief Allocate a custom buffer for the block - * - * \details - * Blocks that wish to allocate custom buffers should override this function. - * - * \param size the size of the buffer to allocate in bytes - */ - virtual char* allocate_custom_buffer(size_t size) - { -#if DEBUG_SINGLE_MAPPED - return new char[size](); -#else - return nullptr; -#endif - } - - /*! - * \brief Free a custom buffer previously allocated by allocate_custom_buffer() - * - * \details - * Blocks that wish to allocate custom buffers should override this function. - * - * \param buffer a pointer to the buffer - */ - virtual void free_custom_buffer(char* buffer) - { -#if DEBUG_SINGLE_MAPPED - delete[] buffer; -#endif - } - + buffer_sptr + replace_buffer(uint32_t src_port, uint32_t dst_port, block_sptr block_owner); // --------------- Performance counter functions ------------- @@ -989,8 +943,10 @@ protected: * that the downstream max number of items must be passed in to this * function for consideration. */ - buffer_sptr - allocate_buffer(int port, int downstream_max_nitems, uint64_t downstream_lcm_nitems); + buffer_sptr allocate_buffer(int port, + int downstream_max_nitems, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult); std::vector<long> d_max_output_buffer; std::vector<long> d_min_output_buffer; diff --git a/gnuradio-runtime/include/gnuradio/buffer.h b/gnuradio-runtime/include/gnuradio/buffer.h index 037fade172..96daf3adcd 100644 --- a/gnuradio-runtime/include/gnuradio/buffer.h +++ b/gnuradio-runtime/include/gnuradio/buffer.h @@ -12,11 +12,13 @@ #define INCLUDED_GR_RUNTIME_BUFFER_H #include <gnuradio/api.h> +#include <gnuradio/buffer_context.h> #include <gnuradio/custom_lock.h> #include <gnuradio/logger.h> #include <gnuradio/runtime_types.h> #include <gnuradio/tags.h> #include <gnuradio/thread/thread.h> + #include <boost/weak_ptr.hpp> #include <iostream> #include <map> @@ -29,7 +31,10 @@ class vmcircbuf; class buffer_reader; class buffer_reader_sm; -enum class BufferMappingType { DoubleMapped, SingleMapped }; +enum class buffer_mapping_type { double_mapped, single_mapped }; + +typedef void* (*memcpy_func_t)(void* dest, const void* src, std::size_t count); +typedef void* (*memmove_func_t)(void* dest, const void* src, std::size_t count); /*! * \brief Allocate a buffer that holds at least \p nitems of size \p sizeof_item. @@ -40,11 +45,18 @@ enum class BufferMappingType { DoubleMapped, SingleMapped }; * * \param nitems is the minimum number of items the buffer will hold. * \param sizeof_item is the size of an item in bytes. + * \param downstream_lcm_nitems is the least common multiple of the items to + * read by downstream block(s) + * \param downstream_max_out_mult is the maximum output multiple of all + * downstream blocks * \param link is the block that writes to this buffer. + * \param buf_owner is the block that owns the buffer which may or may not + * be the same as the block that writes to this buffer */ GR_RUNTIME_API buffer_sptr make_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link = block_sptr(), block_sptr buf_owner = block_sptr()); @@ -63,7 +75,7 @@ public: /*! * \brief return the buffer's mapping type */ - BufferMappingType get_mapping_type() { return d_buf_map_type; } + buffer_mapping_type get_mapping_type() { return d_buf_map_type; } /*! * \brief return number of items worth of space available for writing @@ -89,6 +101,13 @@ public: virtual void* write_pointer(); /*! + * \brief return pointer to read buffer. + * + * The return value points to at least items_available() items. + */ + virtual const void* _read_pointer(unsigned int read_index); + + /*! * \brief tell buffer that we wrote \p nitems into it */ void update_write_pointer(int nitems); @@ -118,6 +137,8 @@ public: uint64_t get_downstream_lcm_nitems() { return d_downstream_lcm_nitems; } + uint32_t get_max_reader_output_multiple() { return d_max_reader_output_multiple; } + virtual void update_reader_block_history(unsigned history, int delay) { d_max_reader_history = std::max(d_max_reader_history, history); @@ -167,6 +188,32 @@ public: } /*! + * \brief Function to be executed after this object's owner completes the + * call to general_work() + */ + virtual void post_work(int nitems) = 0; + + /*! + * \brief Returns true when the current thread is ready to call the callback, + * false otherwise. Note if input_blocked_callback is overridden then this + * function should also be overridden. + */ + virtual bool input_blkd_cb_ready(int items_required, unsigned read_index) + { + return false; + } + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the input is blocked. Override this function if needed. + */ + virtual bool + input_blocked_callback(int items_required, int items_avail, unsigned read_index) + { + return false; + } + + /*! * \brief Returns true if the current thread is ready to execute * output_blocked_callback(), false otherwise. Note if the default * output_blocked_callback is overridden this function should also be @@ -220,6 +267,11 @@ public: // ------------------------------------------------------------------------- + /*! + * \brief Assign buffer context + */ + void set_context(const buffer_context& context); + private: friend class buffer_reader; friend class buffer_reader_sm; @@ -236,7 +288,7 @@ private: protected: char* d_base; // base address of buffer inside d_vmcircbuf. unsigned int d_bufsize; // in items - BufferMappingType d_buf_map_type; + buffer_mapping_type d_buf_map_type; // Keep track of maximum sample delay of any reader; Only prune tags past this. unsigned d_max_reader_delay; @@ -270,6 +322,9 @@ protected: uint64_t d_downstream_lcm_nitems; uint64_t d_write_multiple; + uint32_t d_max_reader_output_multiple; + + buffer_context d_context; /*! * \brief Increment read or write index for this buffer @@ -281,7 +336,7 @@ protected: */ virtual unsigned index_sub(unsigned a, unsigned b) = 0; - virtual bool allocate_buffer(int nitems, size_t sizeof_item) { return false; }; + virtual bool allocate_buffer(int nitems) { return false; }; /*! * \brief constructor is private. Use gr_make_buffer to create instances. @@ -293,16 +348,19 @@ protected: * \param sizeof_item is the size of an item in bytes. * \param downstream_lcm_nitems is the least common multiple of the items to * read by downstream block(s) + * \param downstream_max_out_mult is the maximum output multiple of all + * downstream blocks * \param link is the block that writes to this buffer. * * The total size of the buffer will be rounded up to a system * dependent boundary. This is typically the system page size, but * under MS windows is 64KB. */ - buffer(BufferMappingType buftype, + buffer(buffer_mapping_type buftype, int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link); /*! diff --git a/gnuradio-runtime/include/gnuradio/buffer_context.h b/gnuradio-runtime/include/gnuradio/buffer_context.h new file mode 100644 index 0000000000..477a3cbd43 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/buffer_context.h @@ -0,0 +1,30 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 BlackLynx, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifndef INCLUDED_GR_RUNTIME_BUFFER_CONTEXT_H +#define INCLUDED_GR_RUNTIME_BUFFER_CONTEXT_H + +#include <gnuradio/api.h> +#include <ostream> + +namespace gr { + +enum class buffer_context { + DEFAULT_INVALID, + HOST_TO_DEVICE, + DEVICE_TO_HOST, + HOST_TO_HOST, + DEVICE_TO_DEVICE +}; + +GR_RUNTIME_API std::ostream& operator<<(std::ostream& os, const buffer_context& context); +} // namespace gr + +#endif diff --git a/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h b/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h index b8cc7cdc87..e23432bb13 100644 --- a/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h +++ b/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h @@ -13,6 +13,7 @@ #include <gnuradio/api.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_type.h> #include <gnuradio/logger.h> #include <gnuradio/runtime_types.h> @@ -28,7 +29,11 @@ class vmcircbuf; GR_RUNTIME_API buffer_sptr make_buffer_double_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, - block_sptr link = block_sptr()); + uint32_t downstream_max_out_mult, + block_sptr link = block_sptr(), + block_sptr buf_owner = block_sptr()); + +MAKE_CUSTOM_BUFFER_TYPE(DEFAULT_NON_CUSTOM, make_buffer_double_mapped); /*! * \brief Single writer, multiple reader fifo. @@ -37,6 +42,8 @@ GR_RUNTIME_API buffer_sptr make_buffer_double_mapped(int nitems, class GR_RUNTIME_API buffer_double_mapped : public buffer { public: + static buffer_type type; + gr::logger_ptr d_logger; gr::logger_ptr d_debug_logger; @@ -47,12 +54,18 @@ public: */ virtual int space_available(); + /*! + * Inherited from buffer class. + * @param nitems is the number of items produced by the general_work() function. + */ + virtual void post_work(int nitems) {} + protected: /*! * sets d_vmcircbuf, d_base, d_bufsize. * returns true iff successful. */ - bool allocate_buffer(int nitems, size_t sizeof_item); + bool allocate_buffer(int nitems); virtual unsigned index_add(unsigned a, unsigned b) { @@ -84,8 +97,13 @@ private: uint64_t downstream_lcm_nitems, block_sptr link, block_sptr buf_owner); - friend GR_RUNTIME_API buffer_sptr make_buffer_double_mapped( - int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, block_sptr link); + friend GR_RUNTIME_API buffer_sptr + make_buffer_double_mapped(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner); std::unique_ptr<gr::vmcircbuf> d_vmcircbuf; @@ -107,6 +125,7 @@ private: buffer_double_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link); }; diff --git a/gnuradio-runtime/include/gnuradio/buffer_reader.h b/gnuradio-runtime/include/gnuradio/buffer_reader.h index a7e51d02e9..4885d9b4c1 100644 --- a/gnuradio-runtime/include/gnuradio/buffer_reader.h +++ b/gnuradio-runtime/include/gnuradio/buffer_reader.h @@ -51,7 +51,6 @@ class GR_RUNTIME_API buffer_reader { public: #ifdef BUFFER_DEBUG - // BUFFER DEBUG gr::logger_ptr d_logger; gr::logger_ptr d_debug_logger; #endif @@ -77,7 +76,7 @@ public: /*! * \brief Return number of items available for reading. */ - virtual int items_available(); // const + virtual int items_available() const; /*! * \brief Return buffer this reader reads from. @@ -121,7 +120,7 @@ public: * \brief Return the block that reads via this reader. * */ - block_sptr link() { return block_sptr(d_link); } + block_sptr link() const { return block_sptr(d_link); } /*! * \brief Given a [start,end), returns a vector all tags in the range. @@ -144,14 +143,16 @@ public: /*! * \brief Returns true when the current thread is ready to call the callback, - * false otherwise. Note if input_blocked_callback is overridden then this - * function should also be overridden. + * false otherwise. Delegate calls to buffer class's input_blkd_cb_ready(). + * Note if input_blocked_callback is overridden then this function should + * also be overridden. */ virtual bool input_blkd_cb_ready(int items_required) const { return false; } /*! * \brief Callback function that the scheduler will call when it determines - * that the input is blocked. Override this function if needed. + * that the input is blocked. Delegate calls to buffer class's + * input_blocked_callback(). Override this function if needed. */ virtual bool input_blocked_callback(int items_required, int items_avail) { diff --git a/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h b/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h index 627bfd8582..7e203c93b7 100644 --- a/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h +++ b/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h @@ -32,17 +32,18 @@ public: /*! * \brief Return number of items available for reading. */ - virtual int items_available(); // const + virtual int items_available() const; /*! * \brief Return true if thread is ready to call input_blocked_callback, - * false otherwise + * false otherwise; delegate calls to buffer class's input_blkd_cb_ready() */ virtual bool input_blkd_cb_ready(int items_required) const; /*! * \brief Callback function that the scheduler will call when it determines - * that the input is blocked + * that the input is blocked; delegate calls to buffer class's + * input_blocked_callback() */ virtual bool input_blocked_callback(int items_required, int items_avail); diff --git a/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h b/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h index 5377534aa8..3b2edc0aef 100644 --- a/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h +++ b/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h @@ -11,20 +11,20 @@ #ifndef INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H #define INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H +#include <cstddef> #include <functional> #include <gnuradio/api.h> -#include <gnuradio/block.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_reader.h> #include <gnuradio/logger.h> #include <gnuradio/runtime_types.h> namespace gr { /*! - * TODO: update this - * - * \brief Single writer, multiple reader fifo. + * \brief A single mapped buffer where wrapping conditions are handled explicitly + * via input/output_blocked_callback functions called from block_executor. * \ingroup internal */ class GR_RUNTIME_API buffer_single_mapped : public buffer @@ -45,43 +45,20 @@ public: */ virtual int space_available(); - virtual void update_reader_block_history(unsigned history, int delay) - { - unsigned old_max = d_max_reader_history; - d_max_reader_history = std::max(d_max_reader_history, history); - if (d_max_reader_history != old_max) { - d_write_index = d_max_reader_history - 1; - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << this << "] " - << "buffer_single_mapped constructor -- set wr index to: " - << d_write_index; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - // Reset the reader's read index if the buffer's write index has changed. - // Note that "history - 1" is the nzero_preload value passed to - // buffer_add_reader. - for (auto reader : d_readers) { - reader->d_read_index = d_write_index - (reader->link()->history() - 1); - } - } - - // Only attempt to set has history flag if it is not already set - if (!d_has_history) { - // Blocks that set delay may set history to delay + 1 but this is - // not "real" history - d_has_history = ((static_cast<int>(history) - 1) != delay); - } - } + virtual void update_reader_block_history(unsigned history, int delay); - void deleter(char* ptr) - { - // Delegate free of the underlying buffer to the block that owns it - if (ptr != nullptr) - buf_owner()->free_custom_buffer(ptr); - } + /*! + * \brief Return true if thread is ready to call input_blocked_callback, + * false otherwise + */ + virtual bool input_blkd_cb_ready(int items_required, unsigned read_index); + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the input is blocked. Override this function if needed. + */ + virtual bool + input_blocked_callback(int items_required, int items_avail, unsigned read_index) = 0; /*! * \brief Return true if thread is ready to call the callback, false otherwise @@ -92,14 +69,21 @@ public: * \brief Callback function that the scheduler will call when it determines * that the output is blocked */ - virtual bool output_blocked_callback(int output_multiple, bool force); + virtual bool output_blocked_callback(int output_multiple, bool force) = 0; protected: /*! - * sets d_base, d_bufsize. - * returns true iff successful. + * \brief Make reasonable attempt to adjust nitems based on read/write + * granularity then delegate actual allocation to do_allocate_buffer(). + * @return true iff successful. + */ + virtual bool allocate_buffer(int nitems); + + /*! + * \brief Do actual buffer allocation. This is intended (required) to be + * handled by the derived class. */ - bool allocate_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems); + virtual bool do_allocate_buffer(size_t final_nitems, size_t sizeof_item) = 0; virtual unsigned index_add(unsigned a, unsigned b) { @@ -124,7 +108,7 @@ protected: return s; } -private: + friend class buffer_reader; friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems, @@ -135,7 +119,7 @@ private: block_sptr d_buf_owner; // block that "owns" this buffer - std::unique_ptr<char, std::function<void(char*)>> d_buffer; + std::unique_ptr<char> d_buffer; /*! * \brief constructor is private. Use gr_make_buffer to create instances. @@ -157,8 +141,70 @@ private: buffer_single_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link, block_sptr buf_owner); + + /*! + * \brief Abstracted logic for the input blocked callback function. + * + * This function contains the logic for the input blocked callback however + * the data adjustment portion of the callback has been abstracted to allow + * the caller to pass in the desired buffer and corresponding buffer + * manipulation functions (memcpy and memmove). + * + * The input blocked callback is called when a reader needs to read more + * data than is available in a buffer and the available data is located at + * the end of the buffer. The input blocked callback will attempt to move + * any data located at the beginning of the buffer "down", and will then + * attempt to copy from the end of the buffer back to the beginning of the + * buffer. This process explicitly handles wrapping for a single mapped + * buffer and will realign the data at the beginning of the buffer such + * that the reader is able to read the available data and becomes unblocked. + * + * \param items_required is the number of items required by the reader + * \param items_avail is the number of items available + * \param read_index is the current read index of the buffer reader caller + * \param buffer_ptr is the pointer to the desired buffer + * \param memcpy_func is a pointer to a memcpy function appropriate for the + * the passed in buffer + * \param memmove_func is a pointer to a memmove function appropriate for + * the passed in buffer + */ + virtual bool input_blocked_callback_logic(int items_required, + int items_avail, + unsigned read_index, + char* buffer_ptr, + memcpy_func_t memcpy_func, + memmove_func_t memmove_func); + + /*! + * \brief Abstracted logic for the output blocked callback function. + * + * This function contains the logic for the output blocked callback however + * the data adjustment portion of the callback has been abstracted to allow + * the caller to pass in the desired buffer and corresponding buffer + * manipulation functions (memcpy and memmove). + * + * The output blocked callback is called when a block needs to write data + * to the end of a single mapped buffer but not enough free space exists to + * write the data before the end of the buffer is reached. The output blocked + * callback will attempt to copy data located towards the end of a single + * mapped buffer back to the beginning of the buffer. This process explicitly + * handles wrapping for a single mapped buffer and will realign data located + * at the end of a buffer back to the beginning of the buffer such that the + * writing block can write its output into the buffer after the existing data. + * + * \param output_multiple + * \param force run the callback disregarding the internal checks + * \param buffer_ptr is the pointer to the desired buffer + * \param memmove_func is a pointer to a memmove function appropriate for + * the passed in buffer + */ + virtual bool output_blocked_callback_logic(int output_multiple, + bool force, + char* buffer_ptr, + memmove_func_t memmove_func); }; } /* namespace gr */ diff --git a/gnuradio-runtime/include/gnuradio/buffer_type.h b/gnuradio-runtime/include/gnuradio/buffer_type.h index 527b5ff8e3..3d69f72ca6 100644 --- a/gnuradio-runtime/include/gnuradio/buffer_type.h +++ b/gnuradio-runtime/include/gnuradio/buffer_type.h @@ -12,32 +12,55 @@ #define INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H #include <gnuradio/api.h> +#include <gnuradio/runtime_types.h> #include <cstdint> +#include <functional> #include <mutex> #include <string> +#include <vector> namespace gr { +// This is the function pointer declaration for the factory-like functions +// used to create buffer subclasses +typedef buffer_sptr (*factory_func_ptr)(int nitems, + size_t sizeof_item, + uint64_t downstrea_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner); + +/*! + * \brief Base class for describing a buffer's type. + */ class GR_RUNTIME_API buffer_type_base { public: virtual ~buffer_type_base(){}; // Do not allow copying or assignment - buffer_type_base(buffer_type_base const&) = delete; + // buffer_type_base(buffer_type_base const&) = delete; + + // Temporarily define copy constructor to work around pybind issue with + // default non-copyable function argument + buffer_type_base(buffer_type_base const& other) + : d_name(other.d_name), d_factory(other.d_factory) + { + } + void operator=(buffer_type_base const&) = delete; // Allow equality and inequality comparison bool operator==(const buffer_type_base& other) const { - return d_value == other.d_value; + return d_name == other.d_name; } bool operator!=(const buffer_type_base& other) const { - return d_value != other.d_value; + return d_name != other.d_name; } // Do not allow other comparison (just in case) @@ -45,44 +68,51 @@ public: bool operator>(const buffer_type_base& other) = delete; bool operator<=(const buffer_type_base& other) = delete; bool operator>=(const buffer_type_base& other) = delete; - + /*! + * \brief Get the human-readable name of the type + */ const std::string& name() const { return d_name; } -protected: - static uint32_t s_nextId; - static std::mutex s_mutex; + /*! + * \brief Make and return a buffer subclass of the corresponding type + */ + inline buffer_sptr make_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner) const + { + // Delegate call to factory function + return d_factory(nitems, + sizeof_item, + downstream_lcm_nitems, + downstream_max_out_mult, + link, + buf_owner); + } - uint32_t d_value; - std::string d_name; +protected: + const std::string d_name; + factory_func_ptr d_factory; - // Private constructor - buffer_type_base(const char* name) : d_name(name) + // Protected constructor + buffer_type_base(const char* name, factory_func_ptr factory_func) + : d_name(name), d_factory(factory_func) { - std::lock_guard<std::mutex> lock(s_mutex); - d_value = s_nextId++; } }; -typedef const buffer_type_base& buffer_type_t; - - -#define MAKE_CUSTOM_BUFFER_TYPE(CLASSNAME) \ - class GR_RUNTIME_API buftype_##CLASSNAME : public buffer_type_base \ - { \ - public: \ - static buffer_type_t get() \ - { \ - static buftype_##CLASSNAME instance; \ - return instance; \ - } \ - \ - private: \ - buftype_##CLASSNAME() : buffer_type_base(#CLASSNAME) {} \ - }; +typedef const buffer_type_base& buffer_type; +typedef std::vector<std::reference_wrapper<const buffer_type_base>> gr_vector_buffer_type; -MAKE_CUSTOM_BUFFER_TYPE(DEFAULT_NON_CUSTOM); -MAKE_CUSTOM_BUFFER_TYPE(CUSTOM_HOST); // used only for test purposes +#define MAKE_CUSTOM_BUFFER_TYPE(CLASSNAME, FACTORY_FUNC_PTR) \ + class GR_RUNTIME_API buftype_##CLASSNAME : public buffer_type_base \ + { \ + public: \ + buftype_##CLASSNAME() : buffer_type_base(#CLASSNAME, FACTORY_FUNC_PTR) {} \ + }; } // namespace gr -#endif /* INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H */
\ No newline at end of file +#endif /* INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H */ diff --git a/gnuradio-runtime/include/gnuradio/custom_lock.h b/gnuradio-runtime/include/gnuradio/custom_lock.h index 6422254d99..5011abe5eb 100644 --- a/gnuradio-runtime/include/gnuradio/custom_lock.h +++ b/gnuradio-runtime/include/gnuradio/custom_lock.h @@ -19,7 +19,9 @@ namespace gr { /*! * Custom lock interface. Objects should implement this interface in order to - * use the custom_lock object below. + * use the custom_lock object below. The interface defines two functions that, + * as their names suggest, are called when the lock is locked and unlocked + * respectively. */ class custom_lock_if { @@ -36,7 +38,11 @@ public: }; /*! - * Write me! + * Class that defines a lock using a mutex and a "locker" that implements the + * custom_lock_if interface. The interface defines an on_lock() function that + * is executed when the lock is locked and an on_unlock() function that the + * is called when the lock is unlocked. Calls to these two functions are + * delegated to the locker object. */ class custom_lock { diff --git a/gnuradio-runtime/include/gnuradio/host_buffer.h b/gnuradio-runtime/include/gnuradio/host_buffer.h new file mode 100644 index 0000000000..83f46d2e85 --- /dev/null +++ b/gnuradio-runtime/include/gnuradio/host_buffer.h @@ -0,0 +1,126 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 BlackLynx Inc. + * + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +#ifndef INCLUDED_HOST_BUFFER_H +#define INCLUDED_HOST_BUFFER_H + +#include <gnuradio/buffer_single_mapped.h> +#include <gnuradio/buffer_type.h> +#include <cstddef> + +namespace gr { + + +class GR_RUNTIME_API host_buffer : public buffer_single_mapped +{ +public: + static void* device_memcpy(void* dest, const void* src, std::size_t count); + static void* device_memmove(void* dest, const void* src, std::size_t count); + + static buffer_type type; + + virtual ~host_buffer(); + + /*! + * \brief Handles post-general_work() cleanup and data transfer + * + * Called directly after call to general_work() completes and + * is used for data transfer (and perhaps other administrative + * activities) + * + * \param nitems is the number of items produced by the general_work() function. + */ + void post_work(int nitems); + + /*! + * \brief Do actual buffer allocation. Inherited from buffer_single_mapped. + */ + bool do_allocate_buffer(size_t final_nitems, size_t sizeof_item); + + /*! + * \brief Return a pointer to the write buffer depending on the context + */ + virtual void* write_pointer(); + + /*! + * \brief return pointer to read buffer depending on the context + * + * The return value points to at least items_available() items. + */ + virtual const void* _read_pointer(unsigned int read_index); + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the input is blocked. Override this function if needed. + */ + virtual bool + input_blocked_callback(int items_required, int items_avail, unsigned read_index); + + /*! + * \brief Callback function that the scheduler will call when it determines + * that the output is blocked + */ + virtual bool output_blocked_callback(int output_multiple, bool force); + + /*! + * \brief Creates a new host_buffer object + * + * \param nitems + * \param sizeof_item + * \param downstream_lcm_nitems + * \param link + * \param buf_owner + * + * \return pointer to buffer base class + */ + static buffer_sptr make_host_buffer(int nitems, + std::size_t sizeof_item, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner); + +private: + // This is the simulated device buffer + std::unique_ptr<char> d_device_buf; + char* d_device_base; + + /*! + * \brief constructor is private. Use the static make_host_buffer function + * to create instances. + * + * Allocate a buffer that holds at least \p nitems of size \p sizeof_item. + * + * \param nitems is the minimum number of items the buffer will hold. + * \param sizeof_item is the size of an item in bytes. + * \param downstream_lcm_nitems is the least common multiple of the items to + * read by downstream blocks + * \param downstream_max_out_mult is the maximum output multiple of all + * downstream blocks + * \param link is the block that writes to this buffer. + * \param buf_owner if the block that owns the buffer which may or may not + * be the same as the block that writes to this buffer + * + * The total size of the buffer will be rounded up to a system + * dependent boundary. This is typically the system page size, but + * under MS windows is 64KB. + */ + host_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner); +}; + +// See buffer_type.h for details on this macro. It is used here to generate +// compile-time class representing the host_buffer classes "type". +MAKE_CUSTOM_BUFFER_TYPE(HOST_BUFFER, &host_buffer::make_host_buffer); + +} // namespace gr + +#endif /* INCLUDED_HOST_BUFFER_H */ diff --git a/gnuradio-runtime/include/gnuradio/io_signature.h b/gnuradio-runtime/include/gnuradio/io_signature.h index 8d38e0d44c..60ac798f79 100644 --- a/gnuradio-runtime/include/gnuradio/io_signature.h +++ b/gnuradio-runtime/include/gnuradio/io_signature.h @@ -11,7 +11,11 @@ #ifndef INCLUDED_IO_SIGNATURE_H #define INCLUDED_IO_SIGNATURE_H +#include <functional> + #include <gnuradio/api.h> +#include <gnuradio/buffer_double_mapped.h> +#include <gnuradio/buffer_type.h> #include <gnuradio/runtime_types.h> namespace gr { @@ -25,10 +29,12 @@ class GR_RUNTIME_API io_signature int d_min_streams; int d_max_streams; std::vector<int> d_sizeof_stream_item; + gr_vector_buffer_type d_stream_buffer_type; io_signature(int min_streams, int max_streams, - const std::vector<int>& sizeof_stream_items); + const std::vector<int>& sizeof_stream_items, + gr_vector_buffer_type buftypes); public: typedef std::shared_ptr<io_signature> sptr; @@ -44,8 +50,13 @@ public: * \param min_streams specify minimum number of streams (>= 0) * \param max_streams specify maximum number of streams (>= min_streams or -1 -> * infinite) \param sizeof_stream_item specify the size of the items in each stream + * \param buftype type of buffers the streams should use (defaults to standard host + * double mapped buffer) */ - static sptr make(int min_streams, int max_streams, int sizeof_stream_item); + static sptr make(int min_streams, + int max_streams, + int sizeof_stream_item, + buffer_type buftype = buffer_double_mapped::type); /*! * \brief Create an i/o signature @@ -55,11 +66,17 @@ public: * infinite) \param sizeof_stream_item1 specify the size of the items in the first * stream \param sizeof_stream_item2 specify the size of the items in the second and * subsequent streams + * \param buftype1 type of buffers the first stream should use (defaults to standard + * host double mapped buffer) + * \param buftype2 type of buffers the second and subsequent streams should use + * (defaults to standard host double mapped buffer) */ static sptr make2(int min_streams, int max_streams, int sizeof_stream_item1, - int sizeof_stream_item2); + int sizeof_stream_item2, + buffer_type buftype1 = buffer_double_mapped::type, + buffer_type buftype2 = buffer_double_mapped::type); /*! * \brief Create an i/o signature @@ -70,12 +87,21 @@ public: * stream \param sizeof_stream_item2 specify the size of the items in the second * stream \param sizeof_stream_item3 specify the size of the items in the third and * subsequent streams + * \param buftype1 type of buffers the first stream should use (defaults to standard + * host double mapped buffer) + * \param buftype2 type of buffers the second stream should use (defaults to standard + * host double mapped buffer) + * \param buftype3 type of buffers the third and subsequent streams should use + * (defaults to standard host double mapped buffer) */ static sptr make3(int min_streams, int max_streams, int sizeof_stream_item1, int sizeof_stream_item2, - int sizeof_stream_item3); + int sizeof_stream_item3, + buffer_type buftype1 = buffer_double_mapped::type, + buffer_type buftype2 = buffer_double_mapped::type, + buffer_type buftype3 = buffer_double_mapped::type); /*! * \brief Create an i/o signature @@ -92,10 +118,30 @@ public: static sptr makev(int min_streams, int max_streams, const std::vector<int>& sizeof_stream_items); + /*! + * \brief Create an i/o signature + * + * \param min_streams specify minimum number of streams (>= 0) + * \param max_streams specify maximum number of streams (>= min_streams or -1 -> + * infinite) \param sizeof_stream_items specify the size of the items in the streams + * \param buftypes the type of buffer each stream will should use + * + * If there are more streams than there are entries in + * sizeof_stream_items, the value of the last entry in + * sizeof_stream_items is used for the missing values. + * sizeof_stream_items must contain at least 1 entry. + */ + static sptr makev(int min_streams, + int max_streams, + const std::vector<int>& sizeof_stream_items, + gr_vector_buffer_type buftypes); + int min_streams() const { return d_min_streams; } int max_streams() const { return d_max_streams; } int sizeof_stream_item(int index) const; std::vector<int> sizeof_stream_items() const; + buffer_type stream_buffer_type(size_t index) const; + gr_vector_buffer_type stream_buffer_types() const; }; } /* namespace gr */ diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt index ca96549386..6fcd31bac4 100644 --- a/gnuradio-runtime/lib/CMakeLists.txt +++ b/gnuradio-runtime/lib/CMakeLists.txt @@ -55,16 +55,17 @@ add_library(gnuradio-runtime block_gateway_impl.cc block_registry.cc buffer.cc + buffer_context.cc buffer_double_mapped.cc buffer_reader.cc buffer_reader_sm.cc buffer_single_mapped.cc - buffer_type.cc flat_flowgraph.cc flowgraph.cc hier_block2.cc hier_block2_detail.cc high_res_timer.cc + host_buffer.cc io_signature.cc local_sighandler.cc logger.cc @@ -335,6 +336,7 @@ if(ENABLE_TESTING) qa_buffer.cc qa_io_signature.cc qa_logger.cc + qa_host_buffer.cc qa_vmcircbuf.cc ) list(APPEND GR_TEST_TARGET_DEPS gnuradio-runtime gnuradio-pmt) diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc index bb6ce95298..75f9dc6ae4 100644 --- a/gnuradio-runtime/lib/block.cc +++ b/gnuradio-runtime/lib/block.cc @@ -384,7 +384,8 @@ void block::set_min_output_buffer(int port, long min_output_buffer) void block::allocate_detail(int ninputs, int noutputs, const std::vector<int>& downstream_max_nitems_vec, - const std::vector<uint64_t>& downstream_lcm_nitems_vec) + const std::vector<uint64_t>& downstream_lcm_nitems_vec, + const std::vector<uint32_t>& downstream_max_out_mult_vec) { block_detail_sptr detail = make_block_detail(ninputs, noutputs); @@ -393,8 +394,10 @@ void block::allocate_detail(int ninputs, for (int i = 0; i < noutputs; i++) { expand_minmax_buffer(i); - buffer_sptr buffer = allocate_buffer( - i, downstream_max_nitems_vec[i], downstream_lcm_nitems_vec[i]); + buffer_sptr buffer = allocate_buffer(i, + downstream_max_nitems_vec[i], + downstream_lcm_nitems_vec[i], + downstream_max_out_mult_vec[i]); GR_LOG_DEBUG(d_debug_logger, "Allocated buffer for output " + identifier() + " " + std::to_string(i)); @@ -413,19 +416,24 @@ void block::allocate_detail(int ninputs, set_detail(detail); } -buffer_sptr block::replace_buffer(uint32_t out_port, block_sptr block_owner) +buffer_sptr +block::replace_buffer(uint32_t src_port, uint32_t dst_port, block_sptr block_owner) { block_detail_sptr detail_ = detail(); - buffer_sptr orig_buffer = detail_->output(out_port); + buffer_sptr orig_buffer = detail_->output(src_port); - // Make a new buffer but this time use the passed in block as the owner - buffer_sptr new_buffer = make_buffer(orig_buffer->bufsize(), - orig_buffer->get_sizeof_item(), - orig_buffer->get_downstream_lcm_nitems(), - shared_from_base<block>(), - block_owner); + buffer_type buftype = block_owner->output_signature()->stream_buffer_type(dst_port); - detail_->set_output(out_port, new_buffer); + // Make a new buffer but this time use the passed in block as the owner + buffer_sptr new_buffer = + buftype.make_buffer(orig_buffer->bufsize(), + orig_buffer->get_sizeof_item(), + orig_buffer->get_downstream_lcm_nitems(), + orig_buffer->get_max_reader_output_multiple(), + shared_from_base<block>(), + block_owner); + + detail_->set_output(src_port, new_buffer); return new_buffer; } @@ -435,7 +443,8 @@ void block::enable_update_rate(bool en) { d_update_rate = en; } buffer_sptr block::allocate_buffer(int port, int downstream_max_nitems, - uint64_t downstream_lcm_nitems) + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult) { int item_size = output_signature()->sizeof_stream_item(port); @@ -473,14 +482,16 @@ buffer_sptr block::allocate_buffer(int port, buffer_sptr buf; #ifdef BUFFER_DEBUG - // BUFFER DEBUG GR_LOG_DEBUG(d_logger, "Block: " + name() + " allocated buffer for output " + identifier()); #endif + // Grab the buffer type associated with the output port and use it to + // create the specified type of buffer + buffer_type buftype = output_signature()->stream_buffer_type(port); + try { #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "downstream_max_nitems: " << downstream_max_nitems << " -- downstream_lcm_nitems: " << downstream_lcm_nitems @@ -498,18 +509,20 @@ buffer_sptr block::allocate_buffer(int port, } GR_LOG_DEBUG(d_logger, msg.str()); #endif - buf = make_buffer(nitems, - item_size, - downstream_lcm_nitems, - shared_from_base<block>(), - shared_from_base<block>()); + buf = buftype.make_buffer(nitems, + item_size, + downstream_lcm_nitems, + downstream_max_out_mult, + shared_from_base<block>(), + shared_from_base<block>()); } catch (std::bad_alloc&) { - buf = make_buffer(nitems, - item_size, - downstream_lcm_nitems, - shared_from_base<block>(), - shared_from_base<block>()); + buf = buftype.make_buffer(nitems, + item_size, + downstream_lcm_nitems, + downstream_max_out_mult, + shared_from_base<block>(), + shared_from_base<block>()); } // Set the max noutput items size here to make sure it's always diff --git a/gnuradio-runtime/lib/block_detail.cc b/gnuradio-runtime/lib/block_detail.cc index f5283c56b0..66b32ec078 100644 --- a/gnuradio-runtime/lib/block_detail.cc +++ b/gnuradio-runtime/lib/block_detail.cc @@ -114,6 +114,7 @@ void block_detail::consume_each(int how_many_items) void block_detail::produce(int which_output, int how_many_items) { if (how_many_items > 0) { + d_output[which_output]->post_work(how_many_items); d_output[which_output]->update_write_pointer(how_many_items); d_produce_or |= how_many_items; } @@ -123,6 +124,7 @@ void block_detail::produce_each(int how_many_items) { if (how_many_items > 0) { for (int i = 0; i < noutputs(); i++) { + d_output[i]->post_work(how_many_items); d_output[i]->update_write_pointer(how_many_items); } d_produce_or |= how_many_items; diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc index 6fbf2c5c17..ea59196571 100644 --- a/gnuradio-runtime/lib/block_executor.cc +++ b/gnuradio-runtime/lib/block_executor.cc @@ -59,9 +59,11 @@ static int min_available_space(block* m, int min_noutput_items, int& output_idx) { +#if ENABLE_LOGGING gr::logger_ptr logger; gr::logger_ptr debug_logger; gr::configure_default_loggers(logger, debug_logger, "min_available_space"); +#endif int min_space = std::numeric_limits<int>::max(); if (min_noutput_items == 0) @@ -285,7 +287,7 @@ block_executor::state block_executor::run_one_iteration() // determine the minimum available output space output_idx = 0; - out_try_again: + blkd_out_try_again: noutput_items = min_available_space( m, d, m->output_multiple(), m->min_noutput_items(), output_idx); noutput_items = std::min(noutput_items, max_noutput_items); @@ -312,7 +314,7 @@ block_executor::state block_executor::run_one_iteration() msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx << ")"; GR_LOG_INFO(d_debug_logger, msg.str());); - goto out_try_again; + goto blkd_out_try_again; } } else { return BLKD_OUT; @@ -329,7 +331,6 @@ block_executor::state block_executor::run_one_iteration() d_input_done.resize(d->ninputs()); d_output_items.resize(0); d_start_nitems_read.resize(d->ninputs()); - // LOG(GR_LOG_INFO(d_debug_logger, "sink");); LOG(std::ostringstream msg; msg << m << " -- sink"; GR_LOG_INFO(d_debug_logger, msg.str());); @@ -367,7 +368,6 @@ block_executor::state block_executor::run_one_iteration() GR_LOG_INFO(d_debug_logger, msg.str());); if (noutput_items == 0) { // we're blocked on input - // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN");); LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; GR_LOG_INFO(d_debug_logger, msg.str())); return BLKD_IN; @@ -400,7 +400,7 @@ block_executor::state block_executor::run_one_iteration() // determine the minimum available output space output_idx = 0; - out_try_again2: + blkd_out_try_again2: noutput_items = min_available_space( m, d, m->output_multiple(), m->min_noutput_items(), output_idx); if (ENABLE_LOGGING) { @@ -415,7 +415,6 @@ block_executor::state block_executor::run_one_iteration() goto were_done; if (noutput_items == 0) { // we're output blocked - // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT");); LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT"; GR_LOG_INFO(d_debug_logger, msg.str())); @@ -435,7 +434,7 @@ block_executor::state block_executor::run_one_iteration() msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx << ")"; GR_LOG_INFO(d_debug_logger, msg.str());); - goto out_try_again2; + goto blkd_out_try_again2; } } else { return BLKD_OUT; @@ -532,10 +531,6 @@ block_executor::state block_executor::run_one_iteration() buffer_reader_sptr in_buf = d->input(i); - LOG(std::ostringstream msg; - msg << m << " (t: " << this << ") -- pre-callback"; - GR_LOG_DEBUG(d_debug_logger, msg.str())); - if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) { gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer()); if (in_buf->input_blocked_callback(d_ninput_items_required[i], @@ -681,8 +676,8 @@ block_executor::state block_executor::run_one_iteration() GR_LOG_DEBUG(d_debug_logger, msg.str());); gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf); out_buf->output_blocked_callback(m->output_multiple(), true); - LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i - << "] -- OUTPUT BLOCKED CBACK: " << rc; + LOG(std::ostringstream msg; + msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED CBACK "; GR_LOG_DEBUG(d_debug_logger, msg.str());); } @@ -692,7 +687,6 @@ block_executor::state block_executor::run_one_iteration() GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine"); were_done: - // LOG(GR_LOG_INFO(d_debug_logger, "we're done");); LOG(std::ostringstream msg; msg << m << " -- we're done"; GR_LOG_INFO(d_debug_logger, msg.str())); d->set_done(true); diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc index 7fd0a39579..75fc447e8d 100644 --- a/gnuradio-runtime/lib/buffer.cc +++ b/gnuradio-runtime/lib/buffer.cc @@ -12,6 +12,7 @@ #include "config.h" #endif #include "vmcircbuf.h" +#include <gnuradio/block.h> #include <gnuradio/buffer.h> #include <gnuradio/buffer_double_mapped.h> #include <gnuradio/buffer_reader.h> @@ -56,10 +57,11 @@ static long s_buffer_count = 0; // counts for debugging storage mgmt ---------------------------------------------------------------------------- */ -buffer::buffer(BufferMappingType buf_type, +buffer::buffer(buffer_mapping_type buf_type, int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link) : d_base(0), d_bufsize(0), @@ -76,7 +78,9 @@ buffer::buffer(BufferMappingType buf_type, d_callback_flag(false), d_active_pointer_counter(0), d_downstream_lcm_nitems(downstream_lcm_nitems), - d_write_multiple(0) + d_write_multiple(0), + d_max_reader_output_multiple(downstream_max_out_mult), + d_context(buffer_context::DEFAULT_INVALID) { gr::configure_default_loggers(d_logger, d_debug_logger, "buffer"); @@ -86,44 +90,27 @@ buffer::buffer(BufferMappingType buf_type, buffer_sptr make_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link, block_sptr buf_owner) { #ifdef BUFFER_DEBUG - // BUFFER DEBUG gr::logger_ptr logger; gr::logger_ptr debug_logger; gr::configure_default_loggers(logger, debug_logger, "make_buffer"); std::ostringstream msg; #endif -#if DEBUG_SINGLE_MAPPED - if (1) { -#else - if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) { -#endif - // Buffer type is NOT the default non custom variety so allocate a - // buffer_single_mapped instance -#ifdef BUFFER_DEBUG - msg << "buffer_single_mapped nitems: " << nitems - << " -- sizeof_item: " << sizeof_item; - GR_LOG_DEBUG(logger, msg.str()); -#endif - - return buffer_sptr(new buffer_single_mapped( - nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner)); - - } else { - // Default to allocating a buffer_double_mapped instance -#ifdef BUFFER_DEBUG - msg << "buffer_double_mapped nitems: " << nitems - << " -- sizeof_item: " << sizeof_item; - GR_LOG_DEBUG(logger, msg.str()); -#endif - - return buffer_sptr( - new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); - } + // NOTE: This function is no longer called by flat_flowgraph functions and + // therefore is somewhat deprecated. It will create and return a + // buffer_double_mapped subclass by default. + buffer_type buftype = buffer_double_mapped::type; + return buftype.make_buffer(nitems, + sizeof_item, + downstream_lcm_nitems, + downstream_max_out_mult, + link, + buf_owner); } buffer::~buffer() @@ -134,12 +121,16 @@ buffer::~buffer() void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; } +const void* buffer::_read_pointer(unsigned int read_index) +{ + return &d_base[read_index * d_sizeof_item]; +} + void buffer::update_write_pointer(int nitems) { gr::thread::scoped_lock guard(*mutex()); #ifdef BUFFER_DEBUG - // BUFFER DEBUG unsigned orig_wr_idx = d_write_index; #endif @@ -147,7 +138,6 @@ void buffer::update_write_pointer(int nitems) d_abs_write_offset += nitems; #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index; @@ -266,4 +256,21 @@ std::ostream& operator<<(std::ostream& os, const buffer& buf) return os; } +void buffer::set_context(const buffer_context& context) +{ + if ((d_context == buffer_context::DEFAULT_INVALID) || (d_context == context)) { + // Set the context if the existing value is the default or if it is the + // same as what's already been set + d_context = context; + } else { + // Otherwise error out as the context value cannot be changed after + // it is set + std::ostringstream msg; + msg << "Block: " << link()->identifier() << " has context " << d_context + << " assigned. Cannot change to context " << context << "."; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } +} + } /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_context.cc b/gnuradio-runtime/lib/buffer_context.cc new file mode 100644 index 0000000000..9cd27add36 --- /dev/null +++ b/gnuradio-runtime/lib/buffer_context.cc @@ -0,0 +1,32 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 BlackLynx, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ +#include <gnuradio/buffer_context.h> + +namespace gr { + +std::ostream& operator<<(std::ostream& os, const buffer_context& context) +{ + switch (context) { + case buffer_context::DEFAULT_INVALID: + return os << "DEFAULT_INVALID"; + case buffer_context::HOST_TO_DEVICE: + return os << "HOST_TO_DEVICE"; + case buffer_context::DEVICE_TO_HOST: + return os << "DEVICE_TO_HOST"; + case buffer_context::HOST_TO_HOST: + return os << "HOST_TO_HOST"; + case buffer_context::DEVICE_TO_DEVICE: + return os << "DEVICE_TO_DEVICE"; + default: + return os << "Unknown buffer context: " << static_cast<int>(context); + } +} + +} // namespace gr diff --git a/gnuradio-runtime/lib/buffer_double_mapped.cc b/gnuradio-runtime/lib/buffer_double_mapped.cc index ad99c2162b..700ebac0c1 100644 --- a/gnuradio-runtime/lib/buffer_double_mapped.cc +++ b/gnuradio-runtime/lib/buffer_double_mapped.cc @@ -36,23 +36,25 @@ static inline long minimum_buffer_items(long type_size, long page_size) return page_size / GR_GCD(type_size, page_size); } +buffer_type buffer_double_mapped::type(buftype_DEFAULT_NON_CUSTOM{}); buffer_double_mapped::buffer_double_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link) - : buffer(BufferMappingType::DoubleMapped, + : buffer(buffer_mapping_type::double_mapped, nitems, sizeof_item, downstream_lcm_nitems, + downstream_max_out_mult, link) { gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_double_mapped"); - if (!allocate_buffer(nitems, sizeof_item)) + if (!allocate_buffer(nitems)) throw std::bad_alloc(); #ifdef BUFFER_DEBUG - // BUFFER DEBUG { std::ostringstream msg; msg << "[" << this << "] " @@ -62,13 +64,18 @@ buffer_double_mapped::buffer_double_mapped(int nitems, #endif } +// NB: Added the extra 'block_sptr unused' parameter so that the +// call signature matches the other factory-like functions used to create +// the buffer_single_mapped subclasses buffer_sptr make_buffer_double_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, - block_sptr link) + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr unused) { - return buffer_sptr( - new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link)); + return buffer_sptr(new buffer_double_mapped( + nitems, sizeof_item, downstream_lcm_nitems, downstream_max_out_mult, link)); } buffer_double_mapped::~buffer_double_mapped() {} @@ -77,13 +84,13 @@ buffer_double_mapped::~buffer_double_mapped() {} * sets d_vmcircbuf, d_base, d_bufsize. * returns true iff successful. */ -bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item) +bool buffer_double_mapped::allocate_buffer(int nitems) { int orig_nitems = nitems; // Any buffer size we come up with must be a multiple of min_nitems. int granularity = gr::vmcircbuf_sysconfig::granularity(); - int min_nitems = minimum_buffer_items(sizeof_item, granularity); + int min_nitems = minimum_buffer_items(d_sizeof_item, granularity); // Round-up nitems to a multiple of min_nitems. if (nitems % min_nitems != 0) @@ -91,7 +98,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item) // If we rounded-up a whole bunch, give the user a heads up. // This only happens if sizeof_item is not a power of two. - if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) { + if (nitems > 2 * orig_nitems && nitems * (int)d_sizeof_item > granularity) { auto msg = str(boost::format( "allocate_buffer: tried to allocate" @@ -99,7 +106,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item) " %d were allocated. If this isn't OK, consider padding" " your structure to a power-of-two bytes." " On this platform, our allocation granularity is %d bytes.") % - orig_nitems % sizeof_item % nitems % granularity); + orig_nitems % d_sizeof_item % nitems % granularity); GR_LOG_WARN(d_logger, msg.c_str()); } @@ -138,7 +145,6 @@ int buffer_double_mapped::space_available() } #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << this << "] " << "space_available() called d_write_index: " << d_write_index diff --git a/gnuradio-runtime/lib/buffer_reader.cc b/gnuradio-runtime/lib/buffer_reader.cc index 7ead53032e..b1c1e7fb73 100644 --- a/gnuradio-runtime/lib/buffer_reader.cc +++ b/gnuradio-runtime/lib/buffer_reader.cc @@ -34,11 +34,11 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay buffer_reader_sptr r; - if (buf->get_mapping_type() == BufferMappingType::DoubleMapped) { + if (buf->get_mapping_type() == buffer_mapping_type::double_mapped) { r.reset(new buffer_reader( buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); r->declare_sample_delay(delay); - } else if (buf->get_mapping_type() == BufferMappingType::SingleMapped) { + } else if (buf->get_mapping_type() == buffer_mapping_type::single_mapped) { r.reset(new buffer_reader_sm( buf, buf->index_sub(buf->d_write_index, nzero_preload), link)); r->declare_sample_delay(delay); @@ -51,11 +51,15 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay buf->d_readers.push_back(r.get()); #ifdef BUFFER_DEBUG - // BUFFER DEBUG - std::cerr << " [" << buf.get() << ";" << r.get() - << "] buffer_add_reader() nzero_preload " << nzero_preload - << " -- delay: " << delay << " -- history: " << link->history() - << " -- RD_idx: " << r->d_read_index << std::endl; + gr::logger_ptr logger; + gr::logger_ptr debug_logger; + gr::configure_default_loggers(logger, debug_logger, "buffer_add_reader"); + + std::ostringstream msg; + msg << " [" << buf.get() << ";" << r.get() << "] buffer_add_reader() nzero_preload " + << nzero_preload << " -- delay: " << delay << " -- history: " << link->history() + << " -- RD_idx: " << r->d_read_index; + GR_LOG_DEBUG(debug_logger, msg.str()); #endif return r; @@ -89,12 +93,11 @@ void buffer_reader::declare_sample_delay(unsigned delay) unsigned buffer_reader::sample_delay() const { return d_attr_delay; } -int buffer_reader::items_available() // const +int buffer_reader::items_available() const { int available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index); #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << d_buffer << ";" << this << "] " << "items_available() WR_idx: " << d_buffer->d_write_index @@ -109,7 +112,8 @@ int buffer_reader::items_available() // const const void* buffer_reader::read_pointer() { - return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item]; + // Delegate to buffer subclass + return d_buffer->_read_pointer(d_read_index); } void buffer_reader::update_read_pointer(int nitems) @@ -117,7 +121,6 @@ void buffer_reader::update_read_pointer(int nitems) gr::thread::scoped_lock guard(*mutex()); #ifdef BUFFER_DEBUG - // BUFFER DEBUG unsigned orig_rd_idx = d_read_index; #endif @@ -125,7 +128,6 @@ void buffer_reader::update_read_pointer(int nitems) d_abs_read_offset += nitems; #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << d_buffer << ";" << this << "] update_read_pointer -- orig d_read_index: " << orig_rd_idx diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc index 9e0fac584f..c0d4b07d65 100644 --- a/gnuradio-runtime/lib/buffer_reader_sm.cc +++ b/gnuradio-runtime/lib/buffer_reader_sm.cc @@ -26,13 +26,14 @@ namespace gr { buffer_reader_sm::~buffer_reader_sm() {} -int buffer_reader_sm::items_available() +int buffer_reader_sm::items_available() const { int available = 0; if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) { if (d_buffer->d_write_index == d_read_index) { - if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) { + if ((nitems_read() - sample_delay()) != + (d_buffer->nitems_written() + link()->history() - 1)) { available = d_buffer->d_bufsize - d_read_index; } } else { @@ -55,87 +56,15 @@ int buffer_reader_sm::items_available() bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const { - gr::thread::scoped_lock(*d_buffer->mutex()); - - return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) && - (d_buffer->d_write_index < d_read_index)); + return d_buffer->input_blkd_cb_ready(items_required, d_read_index); } bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail) { - // Maybe adjust read pointers from min read index? - // This would mean that *all* readers must be > (passed) the write index - if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) && - (d_buffer->d_write_index < d_read_index)) { - - // Update items available before going farther as it could be stale - items_avail = items_available(); - - // Find reader with the smallest read index that is greater than the - // write index - uint32_t min_reader_index = std::numeric_limits<uint32_t>::max(); - uint32_t min_read_idx = std::numeric_limits<uint32_t>::max(); - for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { - if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) { - // Record index of reader with minimum read-index - if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) { - min_read_idx = d_buffer->d_readers[idx]->d_read_index; - min_reader_index = idx; - } - } - } - - // Note items_avail might be zero, that's okay. - items_avail += d_read_index - min_read_idx; - int gap = min_read_idx - d_buffer->d_write_index; - if (items_avail > gap) { - return false; - } - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << d_buffer << ";" << this << "] " - << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index - << " -- WR items: " << d_buffer->nitems_written() - << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx; - for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { - if (idx != min_reader_index) { - msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index; - } - } - - msg << " -- GAP: " << gap << " -- items_required: " << items_required - << " -- items_avail: " << items_avail; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - // Shift existing data down to make room for blocked data at end of buffer - uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item; - char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item); - std::memmove(dest, d_buffer->d_base, move_data_size); - - // Next copy the data from the end of the buffer back to the beginning - uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item; - char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item); - std::memcpy(d_buffer->d_base, src, avail_data_size); - - // Now adjust write pointer - d_buffer->d_write_index += items_avail; - - // Finally adjust all reader pointers - for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) { - if (idx == min_reader_index) { - d_buffer->d_readers[idx]->d_read_index = 0; - } else { - d_buffer->d_readers[idx]->d_read_index += items_avail; - d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize; - } - } - - return true; - } + // Update items available before going farther as it could be stale + items_avail = items_available(); - return false; + return d_buffer->input_blocked_callback(items_required, items_avail, d_read_index); } buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer, diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc index 6024396557..db7959c5b4 100644 --- a/gnuradio-runtime/lib/buffer_single_mapped.cc +++ b/gnuradio-runtime/lib/buffer_single_mapped.cc @@ -19,6 +19,8 @@ #include <gnuradio/thread/thread.h> #include <assert.h> #include <algorithm> +#include <cstdlib> +#include <cstring> #include <iostream> #include <stdexcept> @@ -27,30 +29,18 @@ namespace gr { buffer_single_mapped::buffer_single_mapped(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, block_sptr link, block_sptr buf_owner) - : buffer(BufferMappingType::SingleMapped, + : buffer(buffer_mapping_type::single_mapped, nitems, sizeof_item, downstream_lcm_nitems, + downstream_max_out_mult, link), d_buf_owner(buf_owner), - d_buffer(nullptr, - std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1)) + d_buffer(nullptr) { - gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped"); - if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems)) - throw std::bad_alloc(); - -#ifdef BUFFER_DEBUG - // BUFFER DEBUG - { - std::ostringstream msg; - msg << "[" << this << "] " - << "buffer_single_mapped constructor -- history: " << link->history(); - GR_LOG_DEBUG(d_logger, msg.str()); - } -#endif } buffer_single_mapped::~buffer_single_mapped() {} @@ -59,14 +49,21 @@ buffer_single_mapped::~buffer_single_mapped() {} * Allocates underlying buffer. * returns true iff successful. */ -bool buffer_single_mapped::allocate_buffer(int nitems, - size_t sizeof_item, - uint64_t downstream_lcm_nitems) +bool buffer_single_mapped::allocate_buffer(int nitems) { #ifdef BUFFER_DEBUG int orig_nitems = nitems; #endif + // For single mapped buffers resize the initial size to be at least four + // times the size of the largest of any downstream block's output multiple. + // This helps reduce the number of times the input_block_callback() might be + // called which should help overall performance (particularly if the max + // output multiple is large) at the cost of slightly more buffer space. + if (static_cast<uint32_t>(nitems) < (4 * d_max_reader_output_multiple)) { + nitems = 4 * d_max_reader_output_multiple; + } + // Unlike the double mapped buffer case that can easily wrap back onto itself // for both reads and writes the single mapped case needs to be aware of read // and write granularity and size the underlying buffer accordingly. Otherwise @@ -105,33 +102,37 @@ bool buffer_single_mapped::allocate_buffer(int nitems, #endif // Adjust size so output buffer size is a multiple of the write granularity - if (write_granularity != 1 || downstream_lcm_nitems != 1) { - uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems); + if (write_granularity != 1 || d_downstream_lcm_nitems != 1) { + uint64_t size_align_adjust = GR_LCM(write_granularity, d_downstream_lcm_nitems); uint64_t remainder = nitems % size_align_adjust; nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0; #ifdef BUFFER_DEBUG std::ostringstream msg; msg << "allocate_buffer()** called nitems: " << orig_nitems - << " -- read_multiple: " << downstream_lcm_nitems + << " -- read_multiple: " << d_downstream_lcm_nitems << " -- write_multiple: " << write_granularity << " -- NEW nitems: " << nitems; GR_LOG_DEBUG(d_logger, msg.str()); #endif } - // Allocate a new custom buffer from the owning block - char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item); - assert(buf != nullptr); - d_buffer.reset(buf); - - d_base = d_buffer.get(); d_bufsize = nitems; - d_downstream_lcm_nitems = downstream_lcm_nitems; + d_downstream_lcm_nitems = d_downstream_lcm_nitems; d_write_multiple = write_granularity; - return true; + // Do the actual allocation(s) with the finalized nitems + return do_allocate_buffer(nitems, d_sizeof_item); +} + +bool buffer_single_mapped::input_blkd_cb_ready(int items_required, + unsigned int read_index) +{ + gr::thread::scoped_lock(*this->mutex()); + + return (((d_bufsize - read_index) < (uint32_t)items_required) && + (d_write_index < read_index)); } bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple) @@ -145,70 +146,6 @@ bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple) ((space_avail / output_multiple) * output_multiple == 0)); } - -bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force) -{ - uint32_t space_avail = space_available(); - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << this << "] " - << "output_blocked_callback()*** WR_idx: " << d_write_index - << " -- WR items: " << nitems_written() - << " -- output_multiple: " << output_multiple - << " -- space_avail: " << space_avail << " -- force: " << force; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) || - force) { - // Find reader with the smallest read index - uint32_t min_read_idx = d_readers[0]->d_read_index; - for (size_t idx = 1; idx < d_readers.size(); ++idx) { - // Record index of reader with minimum read-index - if (d_readers[idx]->d_read_index < min_read_idx) { - min_read_idx = d_readers[idx]->d_read_index; - } - } - -#ifdef BUFFER_DEBUG - std::ostringstream msg; - msg << "[" << this << "] " - << "output_blocked_callback() WR_idx: " << d_write_index - << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx - << " -- shortcircuit: " - << ((min_read_idx == 0) || (min_read_idx >= d_write_index)) - << " -- to_move_items: " << (d_write_index - min_read_idx) - << " -- space_avail: " << space_avail << " -- force: " << force; - GR_LOG_DEBUG(d_logger, msg.str()); -#endif - - // Make sure we have enough room to start writing back at the beginning - if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) { - return false; - } - - // Determine how much "to be read" data needs to be moved - int to_move_items = d_write_index - min_read_idx; - assert(to_move_items > 0); - uint32_t to_move_bytes = to_move_items * d_sizeof_item; - - // Shift "to be read" data back to the beginning of the buffer - std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes); - - // Adjust write index and each reader index - d_write_index -= min_read_idx; - - for (size_t idx = 0; idx < d_readers.size(); ++idx) { - d_readers[idx]->d_read_index -= min_read_idx; - } - - return true; - } - - return false; -} - int buffer_single_mapped::space_available() { if (d_readers.empty()) @@ -233,7 +170,7 @@ int buffer_single_mapped::space_available() // For single mapped buffer there is no wrapping beyond the end of the // buffer #ifdef BUFFER_DEBUG - int thecase = 4; // REMOVE ME - just for debug + int thecase = 0; #endif int space = d_bufsize - d_write_index; @@ -259,15 +196,17 @@ int buffer_single_mapped::space_available() #endif space = min_read_index - d_write_index; // Leave extra space in case the reader gets stuck and needs realignment - { - if ((d_write_index > (d_bufsize / 2)) || - (min_read_index < (d_bufsize / 2))) { + if (d_max_reader_output_multiple > 1) { + if (static_cast<uint32_t>(space) > d_max_reader_output_multiple) { #ifdef BUFFER_DEBUG - thecase = 17; + thecase = 4; #endif - space = 0; + space = space - d_max_reader_output_multiple; } else { - space = (d_bufsize / 2) - d_write_index; +#ifdef BUFFER_DEBUG + thecase = 5; +#endif + space = 0; } } } @@ -278,7 +217,6 @@ int buffer_single_mapped::space_available() } #ifdef BUFFER_DEBUG - // BUFFER DEBUG std::ostringstream msg; msg << "[" << this << "] " << "space_available() called (case: " << thecase @@ -286,6 +224,7 @@ int buffer_single_mapped::space_available() << " -- min_read_index: " << min_read_index << " (" << min_idx_reader->nitems_read() << ") " << " -- space: " << space + << " -- max_reader_out_mult: " << d_max_reader_output_multiple << " (sample delay: " << min_idx_reader->sample_delay() << ")"; GR_LOG_DEBUG(d_logger, msg.str()); #endif @@ -294,4 +233,198 @@ int buffer_single_mapped::space_available() } } +void buffer_single_mapped::update_reader_block_history(unsigned history, int delay) +{ + unsigned old_max = d_max_reader_history; + d_max_reader_history = std::max(d_max_reader_history, history); + if (d_max_reader_history != old_max) { + d_write_index = d_max_reader_history - 1; + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "buffer_single_mapped constructor -- set wr index to: " << d_write_index; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Reset the reader's read index if the buffer's write index has changed. + // Note that "history - 1" is the nzero_preload value passed to + // buffer_add_reader. + for (auto reader : d_readers) { + reader->d_read_index = d_write_index - (reader->link()->history() - 1); + } + } + + // Only attempt to set has history flag if it is not already set + if (!d_has_history) { + // Blocks that set delay may set history to delay + 1 but this is + // not "real" history + d_has_history = ((static_cast<int>(history) - 1) != delay); + } +} + +//------------------------------------------------------------------------------ + +bool buffer_single_mapped::input_blocked_callback_logic(int items_required, + int items_avail, + unsigned read_index, + char* buffer_ptr, + memcpy_func_t memcpy_func, + memmove_func_t memmove_func) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "input_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize + << " -- RD_idx: " << read_index << " -- items_required: " << items_required + << " -- items_avail: " << items_avail; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Maybe adjust read pointers from min read index? + // This would mean that *all* readers must be > (passed) the write index + if (((d_bufsize - read_index) < (uint32_t)items_required) && + (d_write_index < read_index)) { + + // Find reader with the smallest read index that is greater than the + // write index + uint32_t min_reader_index = std::numeric_limits<uint32_t>::max(); + uint32_t min_read_idx = std::numeric_limits<uint32_t>::max(); + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (d_readers[idx]->d_read_index > d_write_index) { + // Record index of reader with minimum read-index + if (d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_readers[idx]->d_read_index; + min_reader_index = idx; + } + } + } + + // Note items_avail might be zero, that's okay. + items_avail += read_index - min_read_idx; + int gap = min_read_idx - d_write_index; + if (items_avail > gap) { + return false; + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "input_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize + << " -- RD_idx: " << min_read_idx; + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (idx != min_reader_index) { + msg << " -- OTHER_RDR: " << d_readers[idx]->d_read_index; + } + } + + msg << " -- GAP: " << gap << " -- items_required: " << items_required + << " -- items_avail: " << items_avail; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Shift existing data down to make room for blocked data at end of buffer + uint32_t move_data_size = d_write_index * d_sizeof_item; + char* dest = buffer_ptr + (items_avail * d_sizeof_item); + memmove_func(dest, buffer_ptr, move_data_size); + + // Next copy the data from the end of the buffer back to the beginning + uint32_t avail_data_size = items_avail * d_sizeof_item; + char* src = buffer_ptr + (min_read_idx * d_sizeof_item); + memcpy_func(buffer_ptr, src, avail_data_size); + + // Now adjust write pointer + d_write_index += items_avail; + + // Finally adjust all reader pointers + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + if (idx == min_reader_index) { + d_readers[idx]->d_read_index = 0; + } else { + d_readers[idx]->d_read_index += items_avail; + d_readers[idx]->d_read_index %= d_bufsize; + } + } + + return true; + } + + return false; +} + +bool buffer_single_mapped::output_blocked_callback_logic(int output_multiple, + bool force, + char* buffer_ptr, + memmove_func_t memmove_func) +{ + uint32_t space_avail = space_available(); + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback()*** WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() + << " -- output_multiple: " << output_multiple + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) || + force) { + // Find reader with the smallest read index + uint32_t min_read_idx = d_readers[0]->d_read_index; + uint64_t min_read_idx_nitems = d_readers[0]->nitems_read(); + for (size_t idx = 1; idx < d_readers.size(); ++idx) { + // Record index of reader with minimum read-index + if (d_readers[idx]->d_read_index < min_read_idx) { + min_read_idx = d_readers[idx]->d_read_index; + min_read_idx_nitems = d_readers[idx]->nitems_read(); + } + } + +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "output_blocked_callback() WR_idx: " << d_write_index + << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx + << " -- RD items: " << min_read_idx_nitems << " -- shortcircuit: " + << ((min_read_idx == 0) || (min_read_idx > d_write_index) || + (min_read_idx == d_write_index && + min_read_idx_nitems != nitems_written())) + << " -- to_move_items: " << (d_write_index - min_read_idx) + << " -- space_avail: " << space_avail << " -- force: " << force; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + // Make sure we have enough room to start writing back at the beginning + if ((min_read_idx == 0) || (min_read_idx > d_write_index) || + (min_read_idx == d_write_index && min_read_idx_nitems != nitems_written())) { + return false; + } + + // Determine how much "to be read" data needs to be moved + int to_move_items = d_write_index - min_read_idx; + if (to_move_items > 0) { + uint32_t to_move_bytes = to_move_items * d_sizeof_item; + + // Shift "to be read" data back to the beginning of the buffer + memmove_func( + buffer_ptr, buffer_ptr + (min_read_idx * d_sizeof_item), to_move_bytes); + } + + // Adjust write index and each reader index + d_write_index -= min_read_idx; + + for (size_t idx = 0; idx < d_readers.size(); ++idx) { + d_readers[idx]->d_read_index -= min_read_idx; + } + + return true; + } + + return false; +} + } /* namespace gr */ diff --git a/gnuradio-runtime/lib/buffer_type.cc b/gnuradio-runtime/lib/buffer_type.cc deleted file mode 100644 index b667eded1e..0000000000 --- a/gnuradio-runtime/lib/buffer_type.cc +++ /dev/null @@ -1,18 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2020 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * SPDX-License-Identifier: GPL-3.0-or-later - * - */ -#include <gnuradio/buffer_type.h> - -namespace gr { - -uint32_t buffer_type_base::s_nextId = 0; -std::mutex buffer_type_base::s_mutex; - - -} /* namespace gr */
\ No newline at end of file diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc index b9986f8370..ef96ef21ac 100644 --- a/gnuradio-runtime/lib/flat_flowgraph.cc +++ b/gnuradio-runtime/lib/flat_flowgraph.cc @@ -15,6 +15,7 @@ #include "flat_flowgraph.h" #include <gnuradio/block_detail.h> #include <gnuradio/buffer.h> +#include <gnuradio/buffer_double_mapped.h> #include <gnuradio/buffer_reader.h> #include <gnuradio/buffer_type.h> #include <gnuradio/integer_math.h> @@ -87,6 +88,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) // Determine the downstream max per output port std::vector<int> downstream_max_nitems(noutputs, 0); std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1); + std::vector<uint32_t> downstream_max_out_mult(noutputs, 1); #ifdef BUFFER_DEBUG std::ostringstream msg; @@ -96,6 +98,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) for (int i = 0; i < noutputs; i++) { int nitems = 0; uint64_t lcm_nitems = 1; + uint32_t max_out_multiple = 1; basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i); for (basic_block_viter_t blk = downstream_blocks.begin(); blk != downstream_blocks.end(); @@ -116,8 +119,8 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) double decimation = (1.0 / dgrblock->relative_rate()); int multiple = dgrblock->output_multiple(); int history = dgrblock->history(); - nitems = - std::max(nitems, static_cast<int>(2 * (decimation * multiple + history))); + nitems = std::max( + nitems, static_cast<int>(2 * (decimation * multiple + (history - 1)))); // Calculate the LCM of downstream reader nitems #ifdef BUFFER_DEBUG @@ -131,6 +134,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) (uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) - (dgrblock->history() - 1))); } + if (dgrblock->relative_rate() != 1.0) { // Relative rate lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d()); @@ -141,6 +145,10 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) lcm_nitems = 1; } + if (static_cast<uint32_t>(multiple) > max_out_multiple) { + max_out_multiple = multiple; + } + #ifdef BUFFER_DEBUG msg.str(""); msg << " NINPUT_ITEMS: " << nitems; @@ -161,11 +169,15 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block) } downstream_max_nitems[i] = nitems; downstream_lcm_nitems[i] = lcm_nitems; + downstream_max_out_mult[i] = max_out_multiple; } // Allocate the block detail and necessary buffers - grblock->allocate_detail( - ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems); + grblock->allocate_detail(ninputs, + noutputs, + downstream_max_nitems, + downstream_lcm_nitems, + downstream_max_out_mult); } void flat_flowgraph::connect_block_inputs(basic_block_sptr block) @@ -189,26 +201,46 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block) if (!src_grblock) throw std::runtime_error("connect_block_inputs found non-gr::block"); + // In order to determine the buffer context, we need to examine both + // the upstream and the downstream buffer_types + buffer_type src_buf_type = + src_grblock->output_signature()->stream_buffer_type(src_port); + buffer_type dest_buf_type = + grblock->input_signature()->stream_buffer_type(dst_port); + + buffer_context context; + if (src_buf_type == buffer_double_mapped::type && + dest_buf_type == buffer_double_mapped::type) { + context = buffer_context::HOST_TO_HOST; + } else if (src_buf_type != buffer_double_mapped::type && + dest_buf_type == buffer_double_mapped::type) { + context = buffer_context::DEVICE_TO_HOST; + } else if (src_buf_type == buffer_double_mapped::type && + dest_buf_type != buffer_double_mapped::type) { + context = buffer_context::HOST_TO_DEVICE; + } else if (src_buf_type != buffer_double_mapped::type && + dest_buf_type != buffer_double_mapped::type) { + context = buffer_context::DEVICE_TO_DEVICE; + } + buffer_sptr src_buffer; - buffer_type_t src_buf_type = src_grblock->get_buffer_type(); - buffer_type_t dest_buf_type = grblock->get_buffer_type(); - if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() || + if (dest_buf_type == buffer_double_mapped::type || dest_buf_type == src_buf_type) { // The block is not using a custom buffer OR the block and the upstream // block both use the same kind of custom buffer src_buffer = src_grblock->detail()->output(src_port); } else { - if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() && - src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) { + if (dest_buf_type != buffer_double_mapped::type && + src_buf_type == buffer_double_mapped::type) { // The block uses a custom buffer but the upstream block does not // therefore the upstream block's buffer can be replaced with the // type of buffer that the block needs std::ostringstream msg; msg << "Block: " << grblock->identifier() - << "replacing upstream block: " << src_grblock->identifier() + << " replacing upstream block: " << src_grblock->identifier() << " buffer with a custom buffer"; - GR_LOG_DEBUG(d_debug_logger, msg.str()); - src_buffer = src_grblock->replace_buffer(src_port, grblock); + GR_LOG_DEBUG(d_logger, msg.str()); + src_buffer = src_grblock->replace_buffer(src_port, dst_port, grblock); } else { // Both the block and upstream block use incompatible buffer types // which is not currently allowed @@ -223,9 +255,13 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block) } } - GR_LOG_DEBUG(d_debug_logger, - "Setting input " + std::to_string(dst_port) + " from edge " + - (*e).identifier()); + // Set buffer's context + src_buffer->set_context(context); + + std::ostringstream msg; + msg << "Setting input " << dst_port << " from edge " << (*e).identifier() + << " context: " << context; + GR_LOG_DEBUG(d_debug_logger, msg.str()); detail->set_input(dst_port, buffer_add_reader(src_buffer, @@ -341,7 +377,6 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg) // Now deal with the fact that the block details might have // changed numbers of inputs and outputs vs. in the old // flowgraph. - block->detail()->reset_nitem_counters(); block->detail()->clear_tags(); } diff --git a/gnuradio-runtime/lib/host_buffer.cc b/gnuradio-runtime/lib/host_buffer.cc new file mode 100644 index 0000000000..6e1a1d1003 --- /dev/null +++ b/gnuradio-runtime/lib/host_buffer.cc @@ -0,0 +1,255 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 BlackLynx Inc.. + * + * SPDX-License-Identifier: GPL-3.0-or-later + */ + +#include <cstring> +#include <sstream> +#include <stdexcept> + +#include <gnuradio/block.h> +#include <gnuradio/host_buffer.h> + +namespace gr { + +buffer_type host_buffer::type(buftype_HOST_BUFFER{}); + +void* host_buffer::device_memcpy(void* dest, const void* src, std::size_t count) +{ + // There is no spoon...er... device so fake it out using regular memcpy + return std::memcpy(dest, src, count); +} + +void* host_buffer::device_memmove(void* dest, const void* src, std::size_t count) +{ + // There is no spoon...er... device so fake it out using regular memmmove + return std::memmove(dest, src, count); +} + + +host_buffer::host_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner) + : buffer_single_mapped(nitems, + sizeof_item, + downstream_lcm_nitems, + downstream_max_out_mult, + link, + buf_owner), + d_device_base(nullptr) +{ + gr::configure_default_loggers(d_logger, d_debug_logger, "host_buffer"); + if (!allocate_buffer(nitems)) + throw std::bad_alloc(); +} + +host_buffer::~host_buffer() {} + +void host_buffer::post_work(int nitems) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "host_buffer [" << d_context << "] -- post_work: " << nitems; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + if (nitems <= 0) { + return; + } + + // NOTE: when this function is called the write pointer has not yet been + // advanced so it can be used directly as the source ptr + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: { + // Copy data from host buffer to device buffer + void* dest_ptr = &d_device_base[d_write_index * d_sizeof_item]; + device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item); + } break; + + case buffer_context::DEVICE_TO_HOST: { + // Copy data from device buffer to host buffer + void* dest_ptr = &d_base[d_write_index * d_sizeof_item]; + device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item); + } break; + + case buffer_context::DEVICE_TO_DEVICE: + // No op + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } +} + +bool host_buffer::do_allocate_buffer(size_t final_nitems, size_t sizeof_item) +{ +#ifdef BUFFER_DEBUG + { + std::ostringstream msg; + msg << "[" << this << "] " + << "host_buffer constructor -- nitems: " << final_nitems; + GR_LOG_DEBUG(d_logger, msg.str()); + } +#endif + + // This is the host buffer + d_buffer.reset(new char[final_nitems * sizeof_item]()); + d_base = d_buffer.get(); + + // This is the simulated device buffer + d_device_buf.reset(new char[final_nitems * sizeof_item]()); + d_device_base = d_device_buf.get(); + + return true; +} + +void* host_buffer::write_pointer() +{ + void* ptr = nullptr; + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: + // Write into host buffer + ptr = &d_base[d_write_index * d_sizeof_item]; + break; + + case buffer_context::DEVICE_TO_HOST: + case buffer_context::DEVICE_TO_DEVICE: + // Write into "device" buffer + ptr = &d_device_base[d_write_index * d_sizeof_item]; + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + + return ptr; +} + +const void* host_buffer::_read_pointer(unsigned int read_index) +{ + void* ptr = nullptr; + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: + case buffer_context::DEVICE_TO_DEVICE: + // Read from "device" buffer + ptr = &d_device_base[read_index * d_sizeof_item]; + break; + + case buffer_context::DEVICE_TO_HOST: + // Read from host buffer + ptr = &d_base[read_index * d_sizeof_item]; + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + + return ptr; +} + +bool host_buffer::input_blocked_callback(int items_required, + int items_avail, + unsigned read_index) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "host_buffer [" << d_context << "] -- input_blocked_callback"; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + bool rc = false; + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: + case buffer_context::DEVICE_TO_DEVICE: + // Adjust "device" buffer + rc = input_blocked_callback_logic(items_required, + items_avail, + read_index, + d_device_base, + host_buffer::device_memcpy, + host_buffer::device_memmove); + break; + + case buffer_context::DEVICE_TO_HOST: + case buffer_context::HOST_TO_HOST: + // Adjust host buffer + rc = input_blocked_callback_logic( + items_required, items_avail, read_index, d_base, std::memcpy, std::memmove); + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + + return rc; +} + +bool host_buffer::output_blocked_callback(int output_multiple, bool force) +{ +#ifdef BUFFER_DEBUG + std::ostringstream msg; + msg << "[" << this << "] " + << "host_buffer [" << d_context << "] -- output_blocked_callback"; + GR_LOG_DEBUG(d_logger, msg.str()); +#endif + + bool rc = false; + switch (d_context) { + case buffer_context::HOST_TO_DEVICE: + case buffer_context::HOST_TO_HOST: + // Adjust host buffer + rc = output_blocked_callback_logic(output_multiple, force, d_base, std::memmove); + break; + + case buffer_context::DEVICE_TO_HOST: + case buffer_context::DEVICE_TO_DEVICE: + // Adjust "device" buffer + rc = output_blocked_callback_logic( + output_multiple, force, d_device_base, host_buffer::device_memmove); + break; + + default: + std::ostringstream msg; + msg << "Unexpected context for host_buffer: " << d_context; + GR_LOG_ERROR(d_logger, msg.str()); + throw std::runtime_error(msg.str()); + } + + return rc; +} + +buffer_sptr host_buffer::make_host_buffer(int nitems, + size_t sizeof_item, + uint64_t downstream_lcm_nitems, + uint32_t downstream_max_out_mult, + block_sptr link, + block_sptr buf_owner) +{ + return buffer_sptr(new host_buffer(nitems, + sizeof_item, + downstream_lcm_nitems, + downstream_max_out_mult, + link, + buf_owner)); +} + +} /* namespace gr */ diff --git a/gnuradio-runtime/lib/io_signature.cc b/gnuradio-runtime/lib/io_signature.cc index 507268d5ce..e953c7eb5a 100644 --- a/gnuradio-runtime/lib/io_signature.cc +++ b/gnuradio-runtime/lib/io_signature.cc @@ -22,47 +22,69 @@ gr::io_signature::sptr io_signature::makev(int min_streams, int max_streams, const std::vector<int>& sizeof_stream_items) { + gr_vector_buffer_type buftypes(sizeof_stream_items.size(), + buffer_double_mapped::type); return gr::io_signature::sptr( - new io_signature(min_streams, max_streams, sizeof_stream_items)); + new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes)); } -gr::io_signature::sptr -io_signature::make(int min_streams, int max_streams, int sizeof_stream_item) +gr::io_signature::sptr io_signature::makev(int min_streams, + int max_streams, + const std::vector<int>& sizeof_stream_items, + gr_vector_buffer_type buftypes) { - std::vector<int> sizeof_items(1); - sizeof_items[0] = sizeof_stream_item; - return io_signature::makev(min_streams, max_streams, sizeof_items); + return gr::io_signature::sptr( + new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes)); +} + +gr::io_signature::sptr io_signature::make(int min_streams, + int max_streams, + int sizeof_stream_item, + buffer_type buftype) +{ + std::vector<int> sizeof_items{ sizeof_stream_item }; + gr_vector_buffer_type buftypes{ buftype }; + return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes); } gr::io_signature::sptr io_signature::make2(int min_streams, int max_streams, int sizeof_stream_item1, - int sizeof_stream_item2) + int sizeof_stream_item2, + buffer_type buftype1, + buffer_type buftype2) { - std::vector<int> sizeof_items(2); - sizeof_items[0] = sizeof_stream_item1; - sizeof_items[1] = sizeof_stream_item2; - return io_signature::makev(min_streams, max_streams, sizeof_items); + std::vector<int> sizeof_items{ sizeof_stream_item1, sizeof_stream_item2 }; + gr_vector_buffer_type buftypes{ buftype1, buftype2 }; + return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes); } gr::io_signature::sptr io_signature::make3(int min_streams, int max_streams, int sizeof_stream_item1, int sizeof_stream_item2, - int sizeof_stream_item3) + int sizeof_stream_item3, + buffer_type buftype1, + buffer_type buftype2, + buffer_type buftype3) { - std::vector<int> sizeof_items(3); - sizeof_items[0] = sizeof_stream_item1; - sizeof_items[1] = sizeof_stream_item2; - sizeof_items[2] = sizeof_stream_item3; - return io_signature::makev(min_streams, max_streams, sizeof_items); + std::vector<int> sizeof_items{ sizeof_stream_item1, + sizeof_stream_item2, + sizeof_stream_item3 }; + gr_vector_buffer_type buftypes{ buftype1, buftype2, buftype3 }; + return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes); } // ------------------------------------------------------------------------ io_signature::io_signature(int min_streams, int max_streams, - const std::vector<int>& sizeof_stream_items) + const std::vector<int>& sizeof_stream_items, + gr_vector_buffer_type buftypes) + : d_min_streams(min_streams), + d_max_streams(max_streams), + d_sizeof_stream_item(sizeof_stream_items), + d_stream_buffer_type(buftypes) { if (min_streams < 0 || (max_streams != IO_INFINITE && max_streams < min_streams)) throw std::invalid_argument("gr::io_signature(1)"); @@ -75,10 +97,6 @@ io_signature::io_signature(int min_streams, if (max_streams != 0 && sizeof_stream_items[i] < 1) throw std::invalid_argument("gr::io_signature(3)"); } - - d_min_streams = min_streams; - d_max_streams = max_streams; - d_sizeof_stream_item = sizeof_stream_items; } io_signature::~io_signature() {} @@ -97,4 +115,14 @@ std::vector<int> io_signature::sizeof_stream_items() const return d_sizeof_stream_item; } +buffer_type io_signature::stream_buffer_type(size_t index) const +{ + return d_stream_buffer_type[std::min(index, d_stream_buffer_type.size() - 1)]; +} + +gr_vector_buffer_type io_signature::stream_buffer_types() const +{ + return d_stream_buffer_type; +} + } /* namespace gr */ diff --git a/gnuradio-runtime/lib/qa_buffer.cc b/gnuradio-runtime/lib/qa_buffer.cc index cefe548338..1c97fd070d 100644 --- a/gnuradio-runtime/lib/qa_buffer.cc +++ b/gnuradio-runtime/lib/qa_buffer.cc @@ -43,7 +43,7 @@ static void t0_body() int counter = 0; gr::buffer_sptr buf( - gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr())); int last_sa; int sa; @@ -78,7 +78,7 @@ static void t1_body() int read_counter = 0; gr::buffer_sptr buf( - gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr())); gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr())); int sa; @@ -150,7 +150,7 @@ static void t2_body() int nitems = (64 * (1L << 10)) / sizeof(int); // 64K worth of ints gr::buffer_sptr buf( - gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr())); gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr())); int read_counter = 0; @@ -216,7 +216,7 @@ static void t3_body() static const int N = 5; gr::buffer_sptr buf( - gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr())); + gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr())); gr::buffer_reader_sptr reader[N]; int read_counter[N]; int write_counter = 0; diff --git a/gnuradio-runtime/lib/qa_host_buffer.cc b/gnuradio-runtime/lib/qa_host_buffer.cc new file mode 100644 index 0000000000..e8f29afe8d --- /dev/null +++ b/gnuradio-runtime/lib/qa_host_buffer.cc @@ -0,0 +1,334 @@ +/* -*- c++ -*- */ +/* + * Copyright 2021 BlackLynx Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include <config.h> +#endif + +#include <gnuradio/block.h> +#include <gnuradio/buffer_context.h> +#include <gnuradio/buffer_reader.h> +#include <gnuradio/host_buffer.h> +#include <gnuradio/random.h> +#include <boost/test/unit_test.hpp> +#include <cstdlib> +#include <iostream> + +// This is a trivial mocked up block inspired by gr::blocks::nop that is used +// only as a placeholder for testing host_buffer below. +class nop : public gr::block +{ +public: + typedef std::shared_ptr<nop> sptr; + static sptr make(size_t sizeof_stream_item) + { + return gnuradio::make_block_sptr<nop>(sizeof_stream_item); + } + + nop(size_t sizeof_stream_item) + : block("nop", + gr::io_signature::make(0, -1, sizeof_stream_item), + gr::io_signature::make(0, -1, sizeof_stream_item)) + { + } + + ~nop() override {} + + int general_work(int noutput_items, + gr_vector_int& ninput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) override + { + // eat any input that's available + for (unsigned i = 0; i < ninput_items.size(); i++) + consume(i, ninput_items[i]); + + return noutput_items; + } +}; + + +// ---------------------------------------------------------------------------- +// Basic checks for buffer_single_mapped using the host_buffer implementation +// of the interface for testing. +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t0) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + BOOST_CHECK(buf->space_available() == nitems); + BOOST_CHECK(rdr1->items_available() == 0); + + for (int idx = 1; idx <= 16; ++idx) { + buf->update_write_pointer(1000); + BOOST_CHECK(buf->space_available() == (nitems - (idx * 1000))); + + BOOST_CHECK(rdr1->items_available() == (idx * 1000)); + } + + BOOST_CHECK(buf->space_available() == 384); + + buf->update_write_pointer(buf->space_available()); + BOOST_CHECK(buf->space_available() == 0); + BOOST_CHECK(rdr1->items_available() == nitems); + BOOST_CHECK(buf->space_available() == 0); +} + +// ---------------------------------------------------------------------------- +// Basic checks for buffer_single_mapped using the host_buffer implementation +// of the interface for testing. +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t1) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + int space = buf->space_available(); + BOOST_CHECK(nitems == space); + + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(nitems); + BOOST_CHECK(buf->space_available() == 0); + BOOST_CHECK(rdr1->items_available() == nitems); + + for (int idx = 1; idx <= 16; ++idx) { + rdr1->update_read_pointer(1000); + BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000))); + + space = buf->space_available(); + BOOST_CHECK(space == (idx * 1000)); + } + + BOOST_CHECK(rdr1->items_available() == 384); + rdr1->update_read_pointer(384); + BOOST_CHECK(rdr1->items_available() == 0); +} + +// ---------------------------------------------------------------------------- +// Basic check reader/write wrapping of buffer_single_mapped with 1 reader. +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t2) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + int space = buf->space_available(); + BOOST_CHECK(nitems == space); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(nitems); + BOOST_CHECK(buf->space_available() == 0); + + for (int idx = 1; idx <= 16; ++idx) { + rdr1->update_read_pointer(1000); + BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000))); + + space = buf->space_available(); + + if (idx <= 9) + BOOST_CHECK(space == (idx * 1000)); + else + BOOST_CHECK(space == ((idx * 1000) - (nitems / 2))); + + if (idx == 9) { + buf->update_write_pointer(nitems / 2); + } + } + + // At this point we can only read up until the end of the buffer even though + // additional data is available at the beginning of the buffer + BOOST_CHECK(rdr1->items_available() == 384); + rdr1->update_read_pointer(384); + + // Now the (nitems / 2) at the beginning of the buffer should be available + BOOST_CHECK(rdr1->items_available() == (nitems / 2)); + + for (int idx = 0; idx < 4; ++idx) + rdr1->update_read_pointer(1024); + + BOOST_CHECK(buf->space_available() == (nitems / 2)); + BOOST_CHECK(rdr1->items_available() == (nitems / 4)); + + for (int idx = 0; idx < 4; ++idx) + rdr1->update_read_pointer(1000); + + BOOST_CHECK(buf->space_available() == (nitems / 2)); + BOOST_CHECK(rdr1->items_available() == 96); + + rdr1->update_read_pointer(96); + BOOST_CHECK(rdr1->items_available() == 0); + + BOOST_CHECK(buf->space_available() == (nitems / 2)); +} + +// ---------------------------------------------------------------------------- +// Basic check reader/write wrapping of buffer_single_mapped with 2 readers. +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t3) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + gr::buffer_reader_sptr rdr2(gr::buffer_add_reader(buf, 0, nop)); + + int space = buf->space_available(); + BOOST_CHECK(nitems == space); + BOOST_CHECK(rdr1->items_available() == 0); + BOOST_CHECK(rdr2->items_available() == 0); + + buf->update_write_pointer(nitems); + BOOST_CHECK(buf->space_available() == 0); + BOOST_CHECK(rdr1->items_available() == nitems); + BOOST_CHECK(rdr2->items_available() == nitems); + + for (int idx = 1; idx <= 16; ++idx) { + rdr1->update_read_pointer(1000); + BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000))); + + // Reader 2 hasn't read anything so space available should remain 0 + BOOST_CHECK(buf->space_available() == 0); + } + + int last_rdr1_available = rdr1->items_available(); + int increment = last_rdr1_available / 4; + + for (int idx = 1; idx <= 16; ++idx) { + rdr2->update_read_pointer(1000); + BOOST_CHECK(rdr2->items_available() == (nitems - (idx * 1000))); + + BOOST_CHECK(rdr1->items_available() == last_rdr1_available); + if (idx % 4 == 0) { + rdr1->update_read_pointer(increment); + BOOST_CHECK(rdr1->items_available() == (last_rdr1_available - increment)); + last_rdr1_available = rdr1->items_available(); + } + + BOOST_CHECK(buf->space_available() == (idx * 1000)); + } +} + +// ---------------------------------------------------------------------------- +// Basic check of output blocked callback +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t4) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + BOOST_CHECK(nitems == buf->space_available()); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(nitems / 2); + BOOST_CHECK(buf->space_available() == (nitems / 2)); + BOOST_CHECK(rdr1->items_available() == (nitems / 2)); + + rdr1->update_read_pointer(nitems / 2); + BOOST_CHECK(buf->space_available() == (nitems / 2)); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(8000); + BOOST_CHECK(buf->space_available() == 192); + + bool ready = buf->output_blkd_cb_ready(200); + BOOST_CHECK(ready == true); + + bool success = buf->output_blocked_callback(200); + BOOST_CHECK(success == true); + BOOST_CHECK(buf->space_available() == 8384); + BOOST_CHECK(rdr1->items_available() == 8000); + + rdr1->update_read_pointer(4000); + BOOST_CHECK(buf->space_available() == 8384); + BOOST_CHECK(rdr1->items_available() == 4000); + + buf->update_write_pointer(4000); + BOOST_CHECK(buf->space_available() == 4384); + BOOST_CHECK(rdr1->items_available() == 8000); + + rdr1->update_read_pointer(8000); + BOOST_CHECK(buf->space_available() == 4384); + BOOST_CHECK(rdr1->items_available() == 0); +} + +// ---------------------------------------------------------------------------- +// Basic check of input blocked callback +// ---------------------------------------------------------------------------- +BOOST_AUTO_TEST_CASE(t5) +{ + int nitems = 65536 / sizeof(int); + gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder + + gr::buffer_sptr buf( + gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop)); + buf->set_context(gr::buffer_context::HOST_TO_HOST); + + gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop)); + + BOOST_CHECK(nitems == buf->space_available()); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(16000); + BOOST_CHECK(buf->space_available() == 384); + BOOST_CHECK(rdr1->items_available() == 16000); + + rdr1->update_read_pointer(16000); + BOOST_CHECK(buf->space_available() == 384); + BOOST_CHECK(rdr1->items_available() == 0); + + buf->update_write_pointer(384); + BOOST_CHECK(buf->space_available() == 16000); + BOOST_CHECK(rdr1->items_available() == 384); + + buf->update_write_pointer(116); + BOOST_CHECK(buf->space_available() == 15884); + BOOST_CHECK(rdr1->items_available() == 384); + + bool ready = rdr1->input_blkd_cb_ready(400); + BOOST_CHECK(ready == true); + + bool success = rdr1->input_blocked_callback(400, rdr1->items_available()); + BOOST_CHECK(success == true); + BOOST_CHECK(rdr1->items_available() == 500); + + rdr1->update_read_pointer(500); + BOOST_CHECK(buf->space_available() == 15884); + BOOST_CHECK(rdr1->items_available() == 0); +} diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt b/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt index dbc74b0529..2623bcf9c3 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt @@ -18,6 +18,7 @@ messages/msg_queue_python.cc # block_registry_python.cc buffer_python.cc buffer_reader_python.cc + buffer_type_python.cc constants_python.cc endianness_python.cc expj_python.cc diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc index 9660a7f245..4ba7742b15 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc @@ -14,7 +14,7 @@ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ /* BINDTOOL_HEADER_FILE(block.h) */ -/* BINDTOOL_HEADER_FILE_HASH(238d129ad018daa3146ff1d8867dc356) */ +/* BINDTOOL_HEADER_FILE_HASH(23fce54cc3292f62ca2551fb3f409f77) */ /***********************************************************************************/ #include <pybind11/complex.h> diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc index de7d4edf1c..4a991397be 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc @@ -14,7 +14,7 @@ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ /* BINDTOOL_HEADER_FILE(buffer.h) */ -/* BINDTOOL_HEADER_FILE_HASH(e5247f4fe5b5873c66eed72880194981) */ +/* BINDTOOL_HEADER_FILE_HASH(e34c34f70f65bbc7dfbc45adcadf7796) */ /***********************************************************************************/ #include <pybind11/complex.h> diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc index 23e2f39d10..2c86dd9fe6 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc @@ -14,7 +14,7 @@ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ /* BINDTOOL_HEADER_FILE(buffer_reader.h) */ -/* BINDTOOL_HEADER_FILE_HASH(451fcbd61f40b7d17a151474869aad75) */ +/* BINDTOOL_HEADER_FILE_HASH(5a48682a66afd451d2f145b352c95a7e) */ /***********************************************************************************/ #include <pybind11/complex.h> diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_type_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_type_python.cc new file mode 100644 index 0000000000..28bfcfac3c --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_type_python.cc @@ -0,0 +1,41 @@ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +/***********************************************************************************/ +/* This file is automatically generated using bindtool and can be manually edited */ +/* The following lines can be configured to regenerate this file during cmake */ +/* If manual edits are made, the following tags should be modified accordingly. */ +/* BINDTOOL_GEN_AUTOMATIC(0) */ +/* BINDTOOL_USE_PYGCCXML(0) */ +/* BINDTOOL_HEADER_FILE(buffer_type.h) */ +/* BINDTOOL_HEADER_FILE_HASH(0b679f644e232dd519bb812b93c8c0e3) */ +/***********************************************************************************/ + +#include <pybind11/complex.h> +#include <pybind11/pybind11.h> +#include <pybind11/stl.h> + +namespace py = pybind11; + +#include <gnuradio/buffer_type.h> +// pydoc.h is automatically generated in the build directory +#include <buffer_type_pydoc.h> + +// NOTE: buffer_type is really a typedef of const buffer_type_base& so +// buffer_type_base is used below because that's the type we really care about +void bind_buffer_type(py::module& m) +{ + + using buffer_type_base = ::gr::buffer_type_base; + + + py::class_<buffer_type_base>(m, "buffer_type_base", D(buffer_type_base)) + + .def("name", &buffer_type_base::name, D(buffer_type_base, name)); +}
\ No newline at end of file diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/docstrings/buffer_type_pydoc_template.h b/gnuradio-runtime/python/gnuradio/gr/bindings/docstrings/buffer_type_pydoc_template.h new file mode 100644 index 0000000000..cad9363030 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/docstrings/buffer_type_pydoc_template.h @@ -0,0 +1,21 @@ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ +#include "pydoc_macros.h" +#define D(...) DOC(gr, __VA_ARGS__) +/* + This file contains placeholders for docstrings for the Python bindings. + Do not edit! These were automatically extracted during the binding process + and will be overwritten during the build process + */ + + +static const char* __doc_gr_buffer_type_base = R"doc()doc"; + + +static const char* __doc_gr_buffer_type_base_name = R"doc()doc"; diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc index 56ed5c2fe5..d6ddf570dd 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc @@ -14,7 +14,7 @@ /* BINDTOOL_GEN_AUTOMATIC(0) */ /* BINDTOOL_USE_PYGCCXML(0) */ /* BINDTOOL_HEADER_FILE(io_signature.h) */ -/* BINDTOOL_HEADER_FILE_HASH(aa246441b45c1a5b872509ed410615ae) */ +/* BINDTOOL_HEADER_FILE_HASH(baf27e696237b6542ec62a4c5627ea1d) */ /***********************************************************************************/ #include <pybind11/complex.h> @@ -40,6 +40,7 @@ void bind_io_signature(py::module& m) py::arg("min_streams"), py::arg("max_streams"), py::arg("sizeof_stream_item"), + py::arg("buftype") = gr::buffer_double_mapped::type, D(io_signature, make)) @@ -49,6 +50,8 @@ void bind_io_signature(py::module& m) py::arg("max_streams"), py::arg("sizeof_stream_item1"), py::arg("sizeof_stream_item2"), + py::arg("buftype1") = gr::buffer_double_mapped::type, + py::arg("buftype2") = gr::buffer_double_mapped::type, D(io_signature, make2)) @@ -59,14 +62,28 @@ void bind_io_signature(py::module& m) py::arg("sizeof_stream_item1"), py::arg("sizeof_stream_item2"), py::arg("sizeof_stream_item3"), + py::arg("buftype1") = gr::buffer_double_mapped::type, + py::arg("buftype2") = gr::buffer_double_mapped::type, + py::arg("buftype3") = gr::buffer_double_mapped::type, D(io_signature, make3)) + .def_static( + "makev", + py::overload_cast<int, int, const std::vector<int>&>(&io_signature::makev), + py::arg("min_streams"), + py::arg("max_streams"), + py::arg("sizeof_stream_items"), + D(io_signature, makev)) .def_static("makev", - &io_signature::makev, + py::overload_cast<int, + int, + const std::vector<int>&, + gr::gr_vector_buffer_type>(&io_signature::makev), py::arg("min_streams"), py::arg("max_streams"), py::arg("sizeof_stream_items"), + py::arg("buftypes"), D(io_signature, makev)) diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc index 1f39775f81..760578254d 100644 --- a/gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc +++ b/gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc @@ -29,6 +29,7 @@ void bind_block_detail(py::module&); void bind_block_gateway(py::module&); // void bind_block_registry(py::module&); void bind_buffer(py::module&); +void bind_buffer_type(py::module& m); void bind_constants(py::module&); void bind_endianness(py::module&); void bind_expj(py::module&); @@ -121,6 +122,7 @@ PYBIND11_MODULE(gr_python, m) bind_msg_handler(m); bind_msg_queue(m); + bind_buffer_type(m); bind_io_signature(m); // // bind_attributes(m); bind_basic_block(m); diff --git a/gr-audio/lib/portaudio/portaudio_sink.cc b/gr-audio/lib/portaudio/portaudio_sink.cc index ea27c7c0f9..607e69972f 100644 --- a/gr-audio/lib/portaudio/portaudio_sink.cc +++ b/gr-audio/lib/portaudio/portaudio_sink.cc @@ -66,7 +66,7 @@ void portaudio_sink::create_ringbuffer(void) // FYI, the buffer indices are in units of samples. d_writer = gr::make_buffer( - N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples); + N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples, 1); d_reader = gr::buffer_add_reader(d_writer, 0); } diff --git a/gr-audio/lib/portaudio/portaudio_source.cc b/gr-audio/lib/portaudio/portaudio_source.cc index 63a3d5e1ba..a4916009f2 100644 --- a/gr-audio/lib/portaudio/portaudio_source.cc +++ b/gr-audio/lib/portaudio/portaudio_source.cc @@ -65,7 +65,7 @@ void portaudio_source::create_ringbuffer(void) // FYI, the buffer indices are in units of samples. d_writer = gr::make_buffer( - N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples); + N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples, 1); d_reader = gr::buffer_add_reader(d_writer, 0); } |