summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gnuradio-runtime/include/gnuradio/CMakeLists.txt5
-rw-r--r--gnuradio-runtime/include/gnuradio/block.h60
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer.h68
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_context.h30
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_double_mapped.h27
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_reader.h13
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_reader_sm.h7
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_single_mapped.h138
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_type.h94
-rw-r--r--gnuradio-runtime/include/gnuradio/custom_lock.h10
-rw-r--r--gnuradio-runtime/include/gnuradio/host_buffer.h126
-rw-r--r--gnuradio-runtime/include/gnuradio/io_signature.h54
-rw-r--r--gnuradio-runtime/lib/CMakeLists.txt4
-rw-r--r--gnuradio-runtime/lib/block.cc63
-rw-r--r--gnuradio-runtime/lib/block_detail.cc2
-rw-r--r--gnuradio-runtime/lib/block_executor.cc22
-rw-r--r--gnuradio-runtime/lib/buffer.cc71
-rw-r--r--gnuradio-runtime/lib/buffer_context.cc32
-rw-r--r--gnuradio-runtime/lib/buffer_double_mapped.cc28
-rw-r--r--gnuradio-runtime/lib/buffer_reader.cc26
-rw-r--r--gnuradio-runtime/lib/buffer_reader_sm.cc85
-rw-r--r--gnuradio-runtime/lib/buffer_single_mapped.cc337
-rw-r--r--gnuradio-runtime/lib/buffer_type.cc18
-rw-r--r--gnuradio-runtime/lib/flat_flowgraph.cc67
-rw-r--r--gnuradio-runtime/lib/host_buffer.cc255
-rw-r--r--gnuradio-runtime/lib/io_signature.cc72
-rw-r--r--gnuradio-runtime/lib/qa_buffer.cc8
-rw-r--r--gnuradio-runtime/lib/qa_host_buffer.cc334
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt1
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc2
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc2
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc2
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/buffer_type_python.cc41
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/docstrings/buffer_type_pydoc_template.h21
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc21
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc2
-rw-r--r--gr-audio/lib/portaudio/portaudio_sink.cc2
-rw-r--r--gr-audio/lib/portaudio/portaudio_source.cc2
38 files changed, 1655 insertions, 497 deletions
diff --git a/gnuradio-runtime/include/gnuradio/CMakeLists.txt b/gnuradio-runtime/include/gnuradio/CMakeLists.txt
index 9e22212b7b..b45b84e4fa 100644
--- a/gnuradio-runtime/include/gnuradio/CMakeLists.txt
+++ b/gnuradio-runtime/include/gnuradio/CMakeLists.txt
@@ -20,12 +20,14 @@ install(FILES
block_gateway.h
block_registry.h
buffer.h
+ buffer_context.h
buffer_double_mapped.h
buffer_reader.h
buffer_reader_sm.h
buffer_single_mapped.h
- constants.h
buffer_type.h
+ constants.h
+ custom_lock.h
endianness.h
expj.h
flowgraph.h
@@ -35,6 +37,7 @@ install(FILES
gr_complex.h
hier_block2.h
high_res_timer.h
+ host_buffer.h
integer_math.h
io_signature.h
logger.h
diff --git a/gnuradio-runtime/include/gnuradio/block.h b/gnuradio-runtime/include/gnuradio/block.h
index ee90a35d2c..5fd1316951 100644
--- a/gnuradio-runtime/include/gnuradio/block.h
+++ b/gnuradio-runtime/include/gnuradio/block.h
@@ -524,7 +524,8 @@ public:
void allocate_detail(int ninputs,
int noutputs,
const std::vector<int>& downstream_max_nitems_vec,
- const std::vector<uint64_t>& downstream_lcm_nitems_vec);
+ const std::vector<uint64_t>& downstream_lcm_nitems_vec,
+ const std::vector<uint32_t>& downstream_max_out_mult_vec);
// --------------- Custom buffer-related functions -------------
@@ -539,55 +540,8 @@ public:
* that is incompatible with the default buffer type created by this block.
*
*/
- buffer_sptr replace_buffer(uint32_t out_port, block_sptr block_owner);
-
- /*!
- * \brief Return the type of custom buffer used by the block
- *
- * \details
- * Blocks that wish to allocate custom buffers should override this function.
- */
- virtual buffer_type_t get_buffer_type()
- {
-#if DEBUG_SINGLE_MAPPED
- return buftype_CUSTOM_HOST::get();
-#else
- return buftype_DEFAULT_NON_CUSTOM::get();
-#endif
- }
-
- /*!
- * \brief Allocate a custom buffer for the block
- *
- * \details
- * Blocks that wish to allocate custom buffers should override this function.
- *
- * \param size the size of the buffer to allocate in bytes
- */
- virtual char* allocate_custom_buffer(size_t size)
- {
-#if DEBUG_SINGLE_MAPPED
- return new char[size]();
-#else
- return nullptr;
-#endif
- }
-
- /*!
- * \brief Free a custom buffer previously allocated by allocate_custom_buffer()
- *
- * \details
- * Blocks that wish to allocate custom buffers should override this function.
- *
- * \param buffer a pointer to the buffer
- */
- virtual void free_custom_buffer(char* buffer)
- {
-#if DEBUG_SINGLE_MAPPED
- delete[] buffer;
-#endif
- }
-
+ buffer_sptr
+ replace_buffer(uint32_t src_port, uint32_t dst_port, block_sptr block_owner);
// --------------- Performance counter functions -------------
@@ -989,8 +943,10 @@ protected:
* that the downstream max number of items must be passed in to this
* function for consideration.
*/
- buffer_sptr
- allocate_buffer(int port, int downstream_max_nitems, uint64_t downstream_lcm_nitems);
+ buffer_sptr allocate_buffer(int port,
+ int downstream_max_nitems,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult);
std::vector<long> d_max_output_buffer;
std::vector<long> d_min_output_buffer;
diff --git a/gnuradio-runtime/include/gnuradio/buffer.h b/gnuradio-runtime/include/gnuradio/buffer.h
index 037fade172..96daf3adcd 100644
--- a/gnuradio-runtime/include/gnuradio/buffer.h
+++ b/gnuradio-runtime/include/gnuradio/buffer.h
@@ -12,11 +12,13 @@
#define INCLUDED_GR_RUNTIME_BUFFER_H
#include <gnuradio/api.h>
+#include <gnuradio/buffer_context.h>
#include <gnuradio/custom_lock.h>
#include <gnuradio/logger.h>
#include <gnuradio/runtime_types.h>
#include <gnuradio/tags.h>
#include <gnuradio/thread/thread.h>
+
#include <boost/weak_ptr.hpp>
#include <iostream>
#include <map>
@@ -29,7 +31,10 @@ class vmcircbuf;
class buffer_reader;
class buffer_reader_sm;
-enum class BufferMappingType { DoubleMapped, SingleMapped };
+enum class buffer_mapping_type { double_mapped, single_mapped };
+
+typedef void* (*memcpy_func_t)(void* dest, const void* src, std::size_t count);
+typedef void* (*memmove_func_t)(void* dest, const void* src, std::size_t count);
/*!
* \brief Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
@@ -40,11 +45,18 @@ enum class BufferMappingType { DoubleMapped, SingleMapped };
*
* \param nitems is the minimum number of items the buffer will hold.
* \param sizeof_item is the size of an item in bytes.
+ * \param downstream_lcm_nitems is the least common multiple of the items to
+ * read by downstream block(s)
+ * \param downstream_max_out_mult is the maximum output multiple of all
+ * downstream blocks
* \param link is the block that writes to this buffer.
+ * \param buf_owner is the block that owns the buffer which may or may not
+ * be the same as the block that writes to this buffer
*/
GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link = block_sptr(),
block_sptr buf_owner = block_sptr());
@@ -63,7 +75,7 @@ public:
/*!
* \brief return the buffer's mapping type
*/
- BufferMappingType get_mapping_type() { return d_buf_map_type; }
+ buffer_mapping_type get_mapping_type() { return d_buf_map_type; }
/*!
* \brief return number of items worth of space available for writing
@@ -89,6 +101,13 @@ public:
virtual void* write_pointer();
/*!
+ * \brief return pointer to read buffer.
+ *
+ * The return value points to at least items_available() items.
+ */
+ virtual const void* _read_pointer(unsigned int read_index);
+
+ /*!
* \brief tell buffer that we wrote \p nitems into it
*/
void update_write_pointer(int nitems);
@@ -118,6 +137,8 @@ public:
uint64_t get_downstream_lcm_nitems() { return d_downstream_lcm_nitems; }
+ uint32_t get_max_reader_output_multiple() { return d_max_reader_output_multiple; }
+
virtual void update_reader_block_history(unsigned history, int delay)
{
d_max_reader_history = std::max(d_max_reader_history, history);
@@ -167,6 +188,32 @@ public:
}
/*!
+ * \brief Function to be executed after this object's owner completes the
+ * call to general_work()
+ */
+ virtual void post_work(int nitems) = 0;
+
+ /*!
+ * \brief Returns true when the current thread is ready to call the callback,
+ * false otherwise. Note if input_blocked_callback is overridden then this
+ * function should also be overridden.
+ */
+ virtual bool input_blkd_cb_ready(int items_required, unsigned read_index)
+ {
+ return false;
+ }
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the input is blocked. Override this function if needed.
+ */
+ virtual bool
+ input_blocked_callback(int items_required, int items_avail, unsigned read_index)
+ {
+ return false;
+ }
+
+ /*!
* \brief Returns true if the current thread is ready to execute
* output_blocked_callback(), false otherwise. Note if the default
* output_blocked_callback is overridden this function should also be
@@ -220,6 +267,11 @@ public:
// -------------------------------------------------------------------------
+ /*!
+ * \brief Assign buffer context
+ */
+ void set_context(const buffer_context& context);
+
private:
friend class buffer_reader;
friend class buffer_reader_sm;
@@ -236,7 +288,7 @@ private:
protected:
char* d_base; // base address of buffer inside d_vmcircbuf.
unsigned int d_bufsize; // in items
- BufferMappingType d_buf_map_type;
+ buffer_mapping_type d_buf_map_type;
// Keep track of maximum sample delay of any reader; Only prune tags past this.
unsigned d_max_reader_delay;
@@ -270,6 +322,9 @@ protected:
uint64_t d_downstream_lcm_nitems;
uint64_t d_write_multiple;
+ uint32_t d_max_reader_output_multiple;
+
+ buffer_context d_context;
/*!
* \brief Increment read or write index for this buffer
@@ -281,7 +336,7 @@ protected:
*/
virtual unsigned index_sub(unsigned a, unsigned b) = 0;
- virtual bool allocate_buffer(int nitems, size_t sizeof_item) { return false; };
+ virtual bool allocate_buffer(int nitems) { return false; };
/*!
* \brief constructor is private. Use gr_make_buffer to create instances.
@@ -293,16 +348,19 @@ protected:
* \param sizeof_item is the size of an item in bytes.
* \param downstream_lcm_nitems is the least common multiple of the items to
* read by downstream block(s)
+ * \param downstream_max_out_mult is the maximum output multiple of all
+ * downstream blocks
* \param link is the block that writes to this buffer.
*
* The total size of the buffer will be rounded up to a system
* dependent boundary. This is typically the system page size, but
* under MS windows is 64KB.
*/
- buffer(BufferMappingType buftype,
+ buffer(buffer_mapping_type buftype,
int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link);
/*!
diff --git a/gnuradio-runtime/include/gnuradio/buffer_context.h b/gnuradio-runtime/include/gnuradio/buffer_context.h
new file mode 100644
index 0000000000..477a3cbd43
--- /dev/null
+++ b/gnuradio-runtime/include/gnuradio/buffer_context.h
@@ -0,0 +1,30 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifndef INCLUDED_GR_RUNTIME_BUFFER_CONTEXT_H
+#define INCLUDED_GR_RUNTIME_BUFFER_CONTEXT_H
+
+#include <gnuradio/api.h>
+#include <ostream>
+
+namespace gr {
+
+enum class buffer_context {
+ DEFAULT_INVALID,
+ HOST_TO_DEVICE,
+ DEVICE_TO_HOST,
+ HOST_TO_HOST,
+ DEVICE_TO_DEVICE
+};
+
+GR_RUNTIME_API std::ostream& operator<<(std::ostream& os, const buffer_context& context);
+} // namespace gr
+
+#endif
diff --git a/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h b/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h
index b8cc7cdc87..e23432bb13 100644
--- a/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h
+++ b/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h
@@ -13,6 +13,7 @@
#include <gnuradio/api.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_type.h>
#include <gnuradio/logger.h>
#include <gnuradio/runtime_types.h>
@@ -28,7 +29,11 @@ class vmcircbuf;
GR_RUNTIME_API buffer_sptr make_buffer_double_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
- block_sptr link = block_sptr());
+ uint32_t downstream_max_out_mult,
+ block_sptr link = block_sptr(),
+ block_sptr buf_owner = block_sptr());
+
+MAKE_CUSTOM_BUFFER_TYPE(DEFAULT_NON_CUSTOM, make_buffer_double_mapped);
/*!
* \brief Single writer, multiple reader fifo.
@@ -37,6 +42,8 @@ GR_RUNTIME_API buffer_sptr make_buffer_double_mapped(int nitems,
class GR_RUNTIME_API buffer_double_mapped : public buffer
{
public:
+ static buffer_type type;
+
gr::logger_ptr d_logger;
gr::logger_ptr d_debug_logger;
@@ -47,12 +54,18 @@ public:
*/
virtual int space_available();
+ /*!
+ * Inherited from buffer class.
+ * @param nitems is the number of items produced by the general_work() function.
+ */
+ virtual void post_work(int nitems) {}
+
protected:
/*!
* sets d_vmcircbuf, d_base, d_bufsize.
* returns true iff successful.
*/
- bool allocate_buffer(int nitems, size_t sizeof_item);
+ bool allocate_buffer(int nitems);
virtual unsigned index_add(unsigned a, unsigned b)
{
@@ -84,8 +97,13 @@ private:
uint64_t downstream_lcm_nitems,
block_sptr link,
block_sptr buf_owner);
- friend GR_RUNTIME_API buffer_sptr make_buffer_double_mapped(
- int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, block_sptr link);
+ friend GR_RUNTIME_API buffer_sptr
+ make_buffer_double_mapped(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner);
std::unique_ptr<gr::vmcircbuf> d_vmcircbuf;
@@ -107,6 +125,7 @@ private:
buffer_double_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link);
};
diff --git a/gnuradio-runtime/include/gnuradio/buffer_reader.h b/gnuradio-runtime/include/gnuradio/buffer_reader.h
index a7e51d02e9..4885d9b4c1 100644
--- a/gnuradio-runtime/include/gnuradio/buffer_reader.h
+++ b/gnuradio-runtime/include/gnuradio/buffer_reader.h
@@ -51,7 +51,6 @@ class GR_RUNTIME_API buffer_reader
{
public:
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
gr::logger_ptr d_logger;
gr::logger_ptr d_debug_logger;
#endif
@@ -77,7 +76,7 @@ public:
/*!
* \brief Return number of items available for reading.
*/
- virtual int items_available(); // const
+ virtual int items_available() const;
/*!
* \brief Return buffer this reader reads from.
@@ -121,7 +120,7 @@ public:
* \brief Return the block that reads via this reader.
*
*/
- block_sptr link() { return block_sptr(d_link); }
+ block_sptr link() const { return block_sptr(d_link); }
/*!
* \brief Given a [start,end), returns a vector all tags in the range.
@@ -144,14 +143,16 @@ public:
/*!
* \brief Returns true when the current thread is ready to call the callback,
- * false otherwise. Note if input_blocked_callback is overridden then this
- * function should also be overridden.
+ * false otherwise. Delegate calls to buffer class's input_blkd_cb_ready().
+ * Note if input_blocked_callback is overridden then this function should
+ * also be overridden.
*/
virtual bool input_blkd_cb_ready(int items_required) const { return false; }
/*!
* \brief Callback function that the scheduler will call when it determines
- * that the input is blocked. Override this function if needed.
+ * that the input is blocked. Delegate calls to buffer class's
+ * input_blocked_callback(). Override this function if needed.
*/
virtual bool input_blocked_callback(int items_required, int items_avail)
{
diff --git a/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h b/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h
index 627bfd8582..7e203c93b7 100644
--- a/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h
+++ b/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h
@@ -32,17 +32,18 @@ public:
/*!
* \brief Return number of items available for reading.
*/
- virtual int items_available(); // const
+ virtual int items_available() const;
/*!
* \brief Return true if thread is ready to call input_blocked_callback,
- * false otherwise
+ * false otherwise; delegate calls to buffer class's input_blkd_cb_ready()
*/
virtual bool input_blkd_cb_ready(int items_required) const;
/*!
* \brief Callback function that the scheduler will call when it determines
- * that the input is blocked
+ * that the input is blocked; delegate calls to buffer class's
+ * input_blocked_callback()
*/
virtual bool input_blocked_callback(int items_required, int items_avail);
diff --git a/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h b/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h
index 5377534aa8..3b2edc0aef 100644
--- a/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h
+++ b/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h
@@ -11,20 +11,20 @@
#ifndef INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H
#define INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H
+#include <cstddef>
#include <functional>
#include <gnuradio/api.h>
-#include <gnuradio/block.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
#include <gnuradio/logger.h>
#include <gnuradio/runtime_types.h>
namespace gr {
/*!
- * TODO: update this
- *
- * \brief Single writer, multiple reader fifo.
+ * \brief A single mapped buffer where wrapping conditions are handled explicitly
+ * via input/output_blocked_callback functions called from block_executor.
* \ingroup internal
*/
class GR_RUNTIME_API buffer_single_mapped : public buffer
@@ -45,43 +45,20 @@ public:
*/
virtual int space_available();
- virtual void update_reader_block_history(unsigned history, int delay)
- {
- unsigned old_max = d_max_reader_history;
- d_max_reader_history = std::max(d_max_reader_history, history);
- if (d_max_reader_history != old_max) {
- d_write_index = d_max_reader_history - 1;
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "buffer_single_mapped constructor -- set wr index to: "
- << d_write_index;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- // Reset the reader's read index if the buffer's write index has changed.
- // Note that "history - 1" is the nzero_preload value passed to
- // buffer_add_reader.
- for (auto reader : d_readers) {
- reader->d_read_index = d_write_index - (reader->link()->history() - 1);
- }
- }
-
- // Only attempt to set has history flag if it is not already set
- if (!d_has_history) {
- // Blocks that set delay may set history to delay + 1 but this is
- // not "real" history
- d_has_history = ((static_cast<int>(history) - 1) != delay);
- }
- }
+ virtual void update_reader_block_history(unsigned history, int delay);
- void deleter(char* ptr)
- {
- // Delegate free of the underlying buffer to the block that owns it
- if (ptr != nullptr)
- buf_owner()->free_custom_buffer(ptr);
- }
+ /*!
+ * \brief Return true if thread is ready to call input_blocked_callback,
+ * false otherwise
+ */
+ virtual bool input_blkd_cb_ready(int items_required, unsigned read_index);
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the input is blocked. Override this function if needed.
+ */
+ virtual bool
+ input_blocked_callback(int items_required, int items_avail, unsigned read_index) = 0;
/*!
* \brief Return true if thread is ready to call the callback, false otherwise
@@ -92,14 +69,21 @@ public:
* \brief Callback function that the scheduler will call when it determines
* that the output is blocked
*/
- virtual bool output_blocked_callback(int output_multiple, bool force);
+ virtual bool output_blocked_callback(int output_multiple, bool force) = 0;
protected:
/*!
- * sets d_base, d_bufsize.
- * returns true iff successful.
+ * \brief Make reasonable attempt to adjust nitems based on read/write
+ * granularity then delegate actual allocation to do_allocate_buffer().
+ * @return true iff successful.
+ */
+ virtual bool allocate_buffer(int nitems);
+
+ /*!
+ * \brief Do actual buffer allocation. This is intended (required) to be
+ * handled by the derived class.
*/
- bool allocate_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems);
+ virtual bool do_allocate_buffer(size_t final_nitems, size_t sizeof_item) = 0;
virtual unsigned index_add(unsigned a, unsigned b)
{
@@ -124,7 +108,7 @@ protected:
return s;
}
-private:
+
friend class buffer_reader;
friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
@@ -135,7 +119,7 @@ private:
block_sptr d_buf_owner; // block that "owns" this buffer
- std::unique_ptr<char, std::function<void(char*)>> d_buffer;
+ std::unique_ptr<char> d_buffer;
/*!
* \brief constructor is private. Use gr_make_buffer to create instances.
@@ -157,8 +141,70 @@ private:
buffer_single_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link,
block_sptr buf_owner);
+
+ /*!
+ * \brief Abstracted logic for the input blocked callback function.
+ *
+ * This function contains the logic for the input blocked callback however
+ * the data adjustment portion of the callback has been abstracted to allow
+ * the caller to pass in the desired buffer and corresponding buffer
+ * manipulation functions (memcpy and memmove).
+ *
+ * The input blocked callback is called when a reader needs to read more
+ * data than is available in a buffer and the available data is located at
+ * the end of the buffer. The input blocked callback will attempt to move
+ * any data located at the beginning of the buffer "down", and will then
+ * attempt to copy from the end of the buffer back to the beginning of the
+ * buffer. This process explicitly handles wrapping for a single mapped
+ * buffer and will realign the data at the beginning of the buffer such
+ * that the reader is able to read the available data and becomes unblocked.
+ *
+ * \param items_required is the number of items required by the reader
+ * \param items_avail is the number of items available
+ * \param read_index is the current read index of the buffer reader caller
+ * \param buffer_ptr is the pointer to the desired buffer
+ * \param memcpy_func is a pointer to a memcpy function appropriate for the
+ * the passed in buffer
+ * \param memmove_func is a pointer to a memmove function appropriate for
+ * the passed in buffer
+ */
+ virtual bool input_blocked_callback_logic(int items_required,
+ int items_avail,
+ unsigned read_index,
+ char* buffer_ptr,
+ memcpy_func_t memcpy_func,
+ memmove_func_t memmove_func);
+
+ /*!
+ * \brief Abstracted logic for the output blocked callback function.
+ *
+ * This function contains the logic for the output blocked callback however
+ * the data adjustment portion of the callback has been abstracted to allow
+ * the caller to pass in the desired buffer and corresponding buffer
+ * manipulation functions (memcpy and memmove).
+ *
+ * The output blocked callback is called when a block needs to write data
+ * to the end of a single mapped buffer but not enough free space exists to
+ * write the data before the end of the buffer is reached. The output blocked
+ * callback will attempt to copy data located towards the end of a single
+ * mapped buffer back to the beginning of the buffer. This process explicitly
+ * handles wrapping for a single mapped buffer and will realign data located
+ * at the end of a buffer back to the beginning of the buffer such that the
+ * writing block can write its output into the buffer after the existing data.
+ *
+ * \param output_multiple
+ * \param force run the callback disregarding the internal checks
+ * \param buffer_ptr is the pointer to the desired buffer
+ * \param memmove_func is a pointer to a memmove function appropriate for
+ * the passed in buffer
+ */
+ virtual bool output_blocked_callback_logic(int output_multiple,
+ bool force,
+ char* buffer_ptr,
+ memmove_func_t memmove_func);
};
} /* namespace gr */
diff --git a/gnuradio-runtime/include/gnuradio/buffer_type.h b/gnuradio-runtime/include/gnuradio/buffer_type.h
index 527b5ff8e3..3d69f72ca6 100644
--- a/gnuradio-runtime/include/gnuradio/buffer_type.h
+++ b/gnuradio-runtime/include/gnuradio/buffer_type.h
@@ -12,32 +12,55 @@
#define INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H
#include <gnuradio/api.h>
+#include <gnuradio/runtime_types.h>
#include <cstdint>
+#include <functional>
#include <mutex>
#include <string>
+#include <vector>
namespace gr {
+// This is the function pointer declaration for the factory-like functions
+// used to create buffer subclasses
+typedef buffer_sptr (*factory_func_ptr)(int nitems,
+ size_t sizeof_item,
+ uint64_t downstrea_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner);
+
+/*!
+ * \brief Base class for describing a buffer's type.
+ */
class GR_RUNTIME_API buffer_type_base
{
public:
virtual ~buffer_type_base(){};
// Do not allow copying or assignment
- buffer_type_base(buffer_type_base const&) = delete;
+ // buffer_type_base(buffer_type_base const&) = delete;
+
+ // Temporarily define copy constructor to work around pybind issue with
+ // default non-copyable function argument
+ buffer_type_base(buffer_type_base const& other)
+ : d_name(other.d_name), d_factory(other.d_factory)
+ {
+ }
+
void operator=(buffer_type_base const&) = delete;
// Allow equality and inequality comparison
bool operator==(const buffer_type_base& other) const
{
- return d_value == other.d_value;
+ return d_name == other.d_name;
}
bool operator!=(const buffer_type_base& other) const
{
- return d_value != other.d_value;
+ return d_name != other.d_name;
}
// Do not allow other comparison (just in case)
@@ -45,44 +68,51 @@ public:
bool operator>(const buffer_type_base& other) = delete;
bool operator<=(const buffer_type_base& other) = delete;
bool operator>=(const buffer_type_base& other) = delete;
-
+ /*!
+ * \brief Get the human-readable name of the type
+ */
const std::string& name() const { return d_name; }
-protected:
- static uint32_t s_nextId;
- static std::mutex s_mutex;
+ /*!
+ * \brief Make and return a buffer subclass of the corresponding type
+ */
+ inline buffer_sptr make_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner) const
+ {
+ // Delegate call to factory function
+ return d_factory(nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ link,
+ buf_owner);
+ }
- uint32_t d_value;
- std::string d_name;
+protected:
+ const std::string d_name;
+ factory_func_ptr d_factory;
- // Private constructor
- buffer_type_base(const char* name) : d_name(name)
+ // Protected constructor
+ buffer_type_base(const char* name, factory_func_ptr factory_func)
+ : d_name(name), d_factory(factory_func)
{
- std::lock_guard<std::mutex> lock(s_mutex);
- d_value = s_nextId++;
}
};
-typedef const buffer_type_base& buffer_type_t;
-
-
-#define MAKE_CUSTOM_BUFFER_TYPE(CLASSNAME) \
- class GR_RUNTIME_API buftype_##CLASSNAME : public buffer_type_base \
- { \
- public: \
- static buffer_type_t get() \
- { \
- static buftype_##CLASSNAME instance; \
- return instance; \
- } \
- \
- private: \
- buftype_##CLASSNAME() : buffer_type_base(#CLASSNAME) {} \
- };
+typedef const buffer_type_base& buffer_type;
+typedef std::vector<std::reference_wrapper<const buffer_type_base>> gr_vector_buffer_type;
-MAKE_CUSTOM_BUFFER_TYPE(DEFAULT_NON_CUSTOM);
-MAKE_CUSTOM_BUFFER_TYPE(CUSTOM_HOST); // used only for test purposes
+#define MAKE_CUSTOM_BUFFER_TYPE(CLASSNAME, FACTORY_FUNC_PTR) \
+ class GR_RUNTIME_API buftype_##CLASSNAME : public buffer_type_base \
+ { \
+ public: \
+ buftype_##CLASSNAME() : buffer_type_base(#CLASSNAME, FACTORY_FUNC_PTR) {} \
+ };
} // namespace gr
-#endif /* INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H */ \ No newline at end of file
+#endif /* INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H */
diff --git a/gnuradio-runtime/include/gnuradio/custom_lock.h b/gnuradio-runtime/include/gnuradio/custom_lock.h
index 6422254d99..5011abe5eb 100644
--- a/gnuradio-runtime/include/gnuradio/custom_lock.h
+++ b/gnuradio-runtime/include/gnuradio/custom_lock.h
@@ -19,7 +19,9 @@ namespace gr {
/*!
* Custom lock interface. Objects should implement this interface in order to
- * use the custom_lock object below.
+ * use the custom_lock object below. The interface defines two functions that,
+ * as their names suggest, are called when the lock is locked and unlocked
+ * respectively.
*/
class custom_lock_if
{
@@ -36,7 +38,11 @@ public:
};
/*!
- * Write me!
+ * Class that defines a lock using a mutex and a "locker" that implements the
+ * custom_lock_if interface. The interface defines an on_lock() function that
+ * is executed when the lock is locked and an on_unlock() function that the
+ * is called when the lock is unlocked. Calls to these two functions are
+ * delegated to the locker object.
*/
class custom_lock
{
diff --git a/gnuradio-runtime/include/gnuradio/host_buffer.h b/gnuradio-runtime/include/gnuradio/host_buffer.h
new file mode 100644
index 0000000000..83f46d2e85
--- /dev/null
+++ b/gnuradio-runtime/include/gnuradio/host_buffer.h
@@ -0,0 +1,126 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx Inc.
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#ifndef INCLUDED_HOST_BUFFER_H
+#define INCLUDED_HOST_BUFFER_H
+
+#include <gnuradio/buffer_single_mapped.h>
+#include <gnuradio/buffer_type.h>
+#include <cstddef>
+
+namespace gr {
+
+
+class GR_RUNTIME_API host_buffer : public buffer_single_mapped
+{
+public:
+ static void* device_memcpy(void* dest, const void* src, std::size_t count);
+ static void* device_memmove(void* dest, const void* src, std::size_t count);
+
+ static buffer_type type;
+
+ virtual ~host_buffer();
+
+ /*!
+ * \brief Handles post-general_work() cleanup and data transfer
+ *
+ * Called directly after call to general_work() completes and
+ * is used for data transfer (and perhaps other administrative
+ * activities)
+ *
+ * \param nitems is the number of items produced by the general_work() function.
+ */
+ void post_work(int nitems);
+
+ /*!
+ * \brief Do actual buffer allocation. Inherited from buffer_single_mapped.
+ */
+ bool do_allocate_buffer(size_t final_nitems, size_t sizeof_item);
+
+ /*!
+ * \brief Return a pointer to the write buffer depending on the context
+ */
+ virtual void* write_pointer();
+
+ /*!
+ * \brief return pointer to read buffer depending on the context
+ *
+ * The return value points to at least items_available() items.
+ */
+ virtual const void* _read_pointer(unsigned int read_index);
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the input is blocked. Override this function if needed.
+ */
+ virtual bool
+ input_blocked_callback(int items_required, int items_avail, unsigned read_index);
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the output is blocked
+ */
+ virtual bool output_blocked_callback(int output_multiple, bool force);
+
+ /*!
+ * \brief Creates a new host_buffer object
+ *
+ * \param nitems
+ * \param sizeof_item
+ * \param downstream_lcm_nitems
+ * \param link
+ * \param buf_owner
+ *
+ * \return pointer to buffer base class
+ */
+ static buffer_sptr make_host_buffer(int nitems,
+ std::size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner);
+
+private:
+ // This is the simulated device buffer
+ std::unique_ptr<char> d_device_buf;
+ char* d_device_base;
+
+ /*!
+ * \brief constructor is private. Use the static make_host_buffer function
+ * to create instances.
+ *
+ * Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
+ *
+ * \param nitems is the minimum number of items the buffer will hold.
+ * \param sizeof_item is the size of an item in bytes.
+ * \param downstream_lcm_nitems is the least common multiple of the items to
+ * read by downstream blocks
+ * \param downstream_max_out_mult is the maximum output multiple of all
+ * downstream blocks
+ * \param link is the block that writes to this buffer.
+ * \param buf_owner if the block that owns the buffer which may or may not
+ * be the same as the block that writes to this buffer
+ *
+ * The total size of the buffer will be rounded up to a system
+ * dependent boundary. This is typically the system page size, but
+ * under MS windows is 64KB.
+ */
+ host_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner);
+};
+
+// See buffer_type.h for details on this macro. It is used here to generate
+// compile-time class representing the host_buffer classes "type".
+MAKE_CUSTOM_BUFFER_TYPE(HOST_BUFFER, &host_buffer::make_host_buffer);
+
+} // namespace gr
+
+#endif /* INCLUDED_HOST_BUFFER_H */
diff --git a/gnuradio-runtime/include/gnuradio/io_signature.h b/gnuradio-runtime/include/gnuradio/io_signature.h
index 8d38e0d44c..60ac798f79 100644
--- a/gnuradio-runtime/include/gnuradio/io_signature.h
+++ b/gnuradio-runtime/include/gnuradio/io_signature.h
@@ -11,7 +11,11 @@
#ifndef INCLUDED_IO_SIGNATURE_H
#define INCLUDED_IO_SIGNATURE_H
+#include <functional>
+
#include <gnuradio/api.h>
+#include <gnuradio/buffer_double_mapped.h>
+#include <gnuradio/buffer_type.h>
#include <gnuradio/runtime_types.h>
namespace gr {
@@ -25,10 +29,12 @@ class GR_RUNTIME_API io_signature
int d_min_streams;
int d_max_streams;
std::vector<int> d_sizeof_stream_item;
+ gr_vector_buffer_type d_stream_buffer_type;
io_signature(int min_streams,
int max_streams,
- const std::vector<int>& sizeof_stream_items);
+ const std::vector<int>& sizeof_stream_items,
+ gr_vector_buffer_type buftypes);
public:
typedef std::shared_ptr<io_signature> sptr;
@@ -44,8 +50,13 @@ public:
* \param min_streams specify minimum number of streams (>= 0)
* \param max_streams specify maximum number of streams (>= min_streams or -1 ->
* infinite) \param sizeof_stream_item specify the size of the items in each stream
+ * \param buftype type of buffers the streams should use (defaults to standard host
+ * double mapped buffer)
*/
- static sptr make(int min_streams, int max_streams, int sizeof_stream_item);
+ static sptr make(int min_streams,
+ int max_streams,
+ int sizeof_stream_item,
+ buffer_type buftype = buffer_double_mapped::type);
/*!
* \brief Create an i/o signature
@@ -55,11 +66,17 @@ public:
* infinite) \param sizeof_stream_item1 specify the size of the items in the first
* stream \param sizeof_stream_item2 specify the size of the items in the second and
* subsequent streams
+ * \param buftype1 type of buffers the first stream should use (defaults to standard
+ * host double mapped buffer)
+ * \param buftype2 type of buffers the second and subsequent streams should use
+ * (defaults to standard host double mapped buffer)
*/
static sptr make2(int min_streams,
int max_streams,
int sizeof_stream_item1,
- int sizeof_stream_item2);
+ int sizeof_stream_item2,
+ buffer_type buftype1 = buffer_double_mapped::type,
+ buffer_type buftype2 = buffer_double_mapped::type);
/*!
* \brief Create an i/o signature
@@ -70,12 +87,21 @@ public:
* stream \param sizeof_stream_item2 specify the size of the items in the second
* stream \param sizeof_stream_item3 specify the size of the items in the third and
* subsequent streams
+ * \param buftype1 type of buffers the first stream should use (defaults to standard
+ * host double mapped buffer)
+ * \param buftype2 type of buffers the second stream should use (defaults to standard
+ * host double mapped buffer)
+ * \param buftype3 type of buffers the third and subsequent streams should use
+ * (defaults to standard host double mapped buffer)
*/
static sptr make3(int min_streams,
int max_streams,
int sizeof_stream_item1,
int sizeof_stream_item2,
- int sizeof_stream_item3);
+ int sizeof_stream_item3,
+ buffer_type buftype1 = buffer_double_mapped::type,
+ buffer_type buftype2 = buffer_double_mapped::type,
+ buffer_type buftype3 = buffer_double_mapped::type);
/*!
* \brief Create an i/o signature
@@ -92,10 +118,30 @@ public:
static sptr
makev(int min_streams, int max_streams, const std::vector<int>& sizeof_stream_items);
+ /*!
+ * \brief Create an i/o signature
+ *
+ * \param min_streams specify minimum number of streams (>= 0)
+ * \param max_streams specify maximum number of streams (>= min_streams or -1 ->
+ * infinite) \param sizeof_stream_items specify the size of the items in the streams
+ * \param buftypes the type of buffer each stream will should use
+ *
+ * If there are more streams than there are entries in
+ * sizeof_stream_items, the value of the last entry in
+ * sizeof_stream_items is used for the missing values.
+ * sizeof_stream_items must contain at least 1 entry.
+ */
+ static sptr makev(int min_streams,
+ int max_streams,
+ const std::vector<int>& sizeof_stream_items,
+ gr_vector_buffer_type buftypes);
+
int min_streams() const { return d_min_streams; }
int max_streams() const { return d_max_streams; }
int sizeof_stream_item(int index) const;
std::vector<int> sizeof_stream_items() const;
+ buffer_type stream_buffer_type(size_t index) const;
+ gr_vector_buffer_type stream_buffer_types() const;
};
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt
index ca96549386..6fcd31bac4 100644
--- a/gnuradio-runtime/lib/CMakeLists.txt
+++ b/gnuradio-runtime/lib/CMakeLists.txt
@@ -55,16 +55,17 @@ add_library(gnuradio-runtime
block_gateway_impl.cc
block_registry.cc
buffer.cc
+ buffer_context.cc
buffer_double_mapped.cc
buffer_reader.cc
buffer_reader_sm.cc
buffer_single_mapped.cc
- buffer_type.cc
flat_flowgraph.cc
flowgraph.cc
hier_block2.cc
hier_block2_detail.cc
high_res_timer.cc
+ host_buffer.cc
io_signature.cc
local_sighandler.cc
logger.cc
@@ -335,6 +336,7 @@ if(ENABLE_TESTING)
qa_buffer.cc
qa_io_signature.cc
qa_logger.cc
+ qa_host_buffer.cc
qa_vmcircbuf.cc
)
list(APPEND GR_TEST_TARGET_DEPS gnuradio-runtime gnuradio-pmt)
diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc
index bb6ce95298..75f9dc6ae4 100644
--- a/gnuradio-runtime/lib/block.cc
+++ b/gnuradio-runtime/lib/block.cc
@@ -384,7 +384,8 @@ void block::set_min_output_buffer(int port, long min_output_buffer)
void block::allocate_detail(int ninputs,
int noutputs,
const std::vector<int>& downstream_max_nitems_vec,
- const std::vector<uint64_t>& downstream_lcm_nitems_vec)
+ const std::vector<uint64_t>& downstream_lcm_nitems_vec,
+ const std::vector<uint32_t>& downstream_max_out_mult_vec)
{
block_detail_sptr detail = make_block_detail(ninputs, noutputs);
@@ -393,8 +394,10 @@ void block::allocate_detail(int ninputs,
for (int i = 0; i < noutputs; i++) {
expand_minmax_buffer(i);
- buffer_sptr buffer = allocate_buffer(
- i, downstream_max_nitems_vec[i], downstream_lcm_nitems_vec[i]);
+ buffer_sptr buffer = allocate_buffer(i,
+ downstream_max_nitems_vec[i],
+ downstream_lcm_nitems_vec[i],
+ downstream_max_out_mult_vec[i]);
GR_LOG_DEBUG(d_debug_logger,
"Allocated buffer for output " + identifier() + " " +
std::to_string(i));
@@ -413,19 +416,24 @@ void block::allocate_detail(int ninputs,
set_detail(detail);
}
-buffer_sptr block::replace_buffer(uint32_t out_port, block_sptr block_owner)
+buffer_sptr
+block::replace_buffer(uint32_t src_port, uint32_t dst_port, block_sptr block_owner)
{
block_detail_sptr detail_ = detail();
- buffer_sptr orig_buffer = detail_->output(out_port);
+ buffer_sptr orig_buffer = detail_->output(src_port);
- // Make a new buffer but this time use the passed in block as the owner
- buffer_sptr new_buffer = make_buffer(orig_buffer->bufsize(),
- orig_buffer->get_sizeof_item(),
- orig_buffer->get_downstream_lcm_nitems(),
- shared_from_base<block>(),
- block_owner);
+ buffer_type buftype = block_owner->output_signature()->stream_buffer_type(dst_port);
- detail_->set_output(out_port, new_buffer);
+ // Make a new buffer but this time use the passed in block as the owner
+ buffer_sptr new_buffer =
+ buftype.make_buffer(orig_buffer->bufsize(),
+ orig_buffer->get_sizeof_item(),
+ orig_buffer->get_downstream_lcm_nitems(),
+ orig_buffer->get_max_reader_output_multiple(),
+ shared_from_base<block>(),
+ block_owner);
+
+ detail_->set_output(src_port, new_buffer);
return new_buffer;
}
@@ -435,7 +443,8 @@ void block::enable_update_rate(bool en) { d_update_rate = en; }
buffer_sptr block::allocate_buffer(int port,
int downstream_max_nitems,
- uint64_t downstream_lcm_nitems)
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult)
{
int item_size = output_signature()->sizeof_stream_item(port);
@@ -473,14 +482,16 @@ buffer_sptr block::allocate_buffer(int port,
buffer_sptr buf;
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
GR_LOG_DEBUG(d_logger,
"Block: " + name() + " allocated buffer for output " + identifier());
#endif
+ // Grab the buffer type associated with the output port and use it to
+ // create the specified type of buffer
+ buffer_type buftype = output_signature()->stream_buffer_type(port);
+
try {
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "downstream_max_nitems: " << downstream_max_nitems
<< " -- downstream_lcm_nitems: " << downstream_lcm_nitems
@@ -498,18 +509,20 @@ buffer_sptr block::allocate_buffer(int port,
}
GR_LOG_DEBUG(d_logger, msg.str());
#endif
- buf = make_buffer(nitems,
- item_size,
- downstream_lcm_nitems,
- shared_from_base<block>(),
- shared_from_base<block>());
+ buf = buftype.make_buffer(nitems,
+ item_size,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ shared_from_base<block>(),
+ shared_from_base<block>());
} catch (std::bad_alloc&) {
- buf = make_buffer(nitems,
- item_size,
- downstream_lcm_nitems,
- shared_from_base<block>(),
- shared_from_base<block>());
+ buf = buftype.make_buffer(nitems,
+ item_size,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ shared_from_base<block>(),
+ shared_from_base<block>());
}
// Set the max noutput items size here to make sure it's always
diff --git a/gnuradio-runtime/lib/block_detail.cc b/gnuradio-runtime/lib/block_detail.cc
index f5283c56b0..66b32ec078 100644
--- a/gnuradio-runtime/lib/block_detail.cc
+++ b/gnuradio-runtime/lib/block_detail.cc
@@ -114,6 +114,7 @@ void block_detail::consume_each(int how_many_items)
void block_detail::produce(int which_output, int how_many_items)
{
if (how_many_items > 0) {
+ d_output[which_output]->post_work(how_many_items);
d_output[which_output]->update_write_pointer(how_many_items);
d_produce_or |= how_many_items;
}
@@ -123,6 +124,7 @@ void block_detail::produce_each(int how_many_items)
{
if (how_many_items > 0) {
for (int i = 0; i < noutputs(); i++) {
+ d_output[i]->post_work(how_many_items);
d_output[i]->update_write_pointer(how_many_items);
}
d_produce_or |= how_many_items;
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc
index 6fbf2c5c17..ea59196571 100644
--- a/gnuradio-runtime/lib/block_executor.cc
+++ b/gnuradio-runtime/lib/block_executor.cc
@@ -59,9 +59,11 @@ static int min_available_space(block* m,
int min_noutput_items,
int& output_idx)
{
+#if ENABLE_LOGGING
gr::logger_ptr logger;
gr::logger_ptr debug_logger;
gr::configure_default_loggers(logger, debug_logger, "min_available_space");
+#endif
int min_space = std::numeric_limits<int>::max();
if (min_noutput_items == 0)
@@ -285,7 +287,7 @@ block_executor::state block_executor::run_one_iteration()
// determine the minimum available output space
output_idx = 0;
- out_try_again:
+ blkd_out_try_again:
noutput_items = min_available_space(
m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
noutput_items = std::min(noutput_items, max_noutput_items);
@@ -312,7 +314,7 @@ block_executor::state block_executor::run_one_iteration()
msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx
<< ")";
GR_LOG_INFO(d_debug_logger, msg.str()););
- goto out_try_again;
+ goto blkd_out_try_again;
}
} else {
return BLKD_OUT;
@@ -329,7 +331,6 @@ block_executor::state block_executor::run_one_iteration()
d_input_done.resize(d->ninputs());
d_output_items.resize(0);
d_start_nitems_read.resize(d->ninputs());
- // LOG(GR_LOG_INFO(d_debug_logger, "sink"););
LOG(std::ostringstream msg; msg << m << " -- sink";
GR_LOG_INFO(d_debug_logger, msg.str()););
@@ -367,7 +368,6 @@ block_executor::state block_executor::run_one_iteration()
GR_LOG_INFO(d_debug_logger, msg.str()););
if (noutput_items == 0) { // we're blocked on input
- // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
LOG(std::ostringstream msg; msg << m << " -- BLKD_IN";
GR_LOG_INFO(d_debug_logger, msg.str()));
return BLKD_IN;
@@ -400,7 +400,7 @@ block_executor::state block_executor::run_one_iteration()
// determine the minimum available output space
output_idx = 0;
- out_try_again2:
+ blkd_out_try_again2:
noutput_items = min_available_space(
m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
if (ENABLE_LOGGING) {
@@ -415,7 +415,6 @@ block_executor::state block_executor::run_one_iteration()
goto were_done;
if (noutput_items == 0) { // we're output blocked
- // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT";
GR_LOG_INFO(d_debug_logger, msg.str()));
@@ -435,7 +434,7 @@ block_executor::state block_executor::run_one_iteration()
msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx
<< ")";
GR_LOG_INFO(d_debug_logger, msg.str()););
- goto out_try_again2;
+ goto blkd_out_try_again2;
}
} else {
return BLKD_OUT;
@@ -532,10 +531,6 @@ block_executor::state block_executor::run_one_iteration()
buffer_reader_sptr in_buf = d->input(i);
- LOG(std::ostringstream msg;
- msg << m << " (t: " << this << ") -- pre-callback";
- GR_LOG_DEBUG(d_debug_logger, msg.str()));
-
if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) {
gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer());
if (in_buf->input_blocked_callback(d_ninput_items_required[i],
@@ -681,8 +676,8 @@ block_executor::state block_executor::run_one_iteration()
GR_LOG_DEBUG(d_debug_logger, msg.str()););
gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
out_buf->output_blocked_callback(m->output_multiple(), true);
- LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i
- << "] -- OUTPUT BLOCKED CBACK: " << rc;
+ LOG(std::ostringstream msg;
+ msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED CBACK ";
GR_LOG_DEBUG(d_debug_logger, msg.str()););
}
@@ -692,7 +687,6 @@ block_executor::state block_executor::run_one_iteration()
GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine");
were_done:
- // LOG(GR_LOG_INFO(d_debug_logger, "we're done"););
LOG(std::ostringstream msg; msg << m << " -- we're done";
GR_LOG_INFO(d_debug_logger, msg.str()));
d->set_done(true);
diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc
index 7fd0a39579..75fc447e8d 100644
--- a/gnuradio-runtime/lib/buffer.cc
+++ b/gnuradio-runtime/lib/buffer.cc
@@ -12,6 +12,7 @@
#include "config.h"
#endif
#include "vmcircbuf.h"
+#include <gnuradio/block.h>
#include <gnuradio/buffer.h>
#include <gnuradio/buffer_double_mapped.h>
#include <gnuradio/buffer_reader.h>
@@ -56,10 +57,11 @@ static long s_buffer_count = 0; // counts for debugging storage mgmt
---------------------------------------------------------------------------- */
-buffer::buffer(BufferMappingType buf_type,
+buffer::buffer(buffer_mapping_type buf_type,
int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link)
: d_base(0),
d_bufsize(0),
@@ -76,7 +78,9 @@ buffer::buffer(BufferMappingType buf_type,
d_callback_flag(false),
d_active_pointer_counter(0),
d_downstream_lcm_nitems(downstream_lcm_nitems),
- d_write_multiple(0)
+ d_write_multiple(0),
+ d_max_reader_output_multiple(downstream_max_out_mult),
+ d_context(buffer_context::DEFAULT_INVALID)
{
gr::configure_default_loggers(d_logger, d_debug_logger, "buffer");
@@ -86,44 +90,27 @@ buffer::buffer(BufferMappingType buf_type,
buffer_sptr make_buffer(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link,
block_sptr buf_owner)
{
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
gr::logger_ptr logger;
gr::logger_ptr debug_logger;
gr::configure_default_loggers(logger, debug_logger, "make_buffer");
std::ostringstream msg;
#endif
-#if DEBUG_SINGLE_MAPPED
- if (1) {
-#else
- if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) {
-#endif
- // Buffer type is NOT the default non custom variety so allocate a
- // buffer_single_mapped instance
-#ifdef BUFFER_DEBUG
- msg << "buffer_single_mapped nitems: " << nitems
- << " -- sizeof_item: " << sizeof_item;
- GR_LOG_DEBUG(logger, msg.str());
-#endif
-
- return buffer_sptr(new buffer_single_mapped(
- nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner));
-
- } else {
- // Default to allocating a buffer_double_mapped instance
-#ifdef BUFFER_DEBUG
- msg << "buffer_double_mapped nitems: " << nitems
- << " -- sizeof_item: " << sizeof_item;
- GR_LOG_DEBUG(logger, msg.str());
-#endif
-
- return buffer_sptr(
- new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
- }
+ // NOTE: This function is no longer called by flat_flowgraph functions and
+ // therefore is somewhat deprecated. It will create and return a
+ // buffer_double_mapped subclass by default.
+ buffer_type buftype = buffer_double_mapped::type;
+ return buftype.make_buffer(nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ link,
+ buf_owner);
}
buffer::~buffer()
@@ -134,12 +121,16 @@ buffer::~buffer()
void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
+const void* buffer::_read_pointer(unsigned int read_index)
+{
+ return &d_base[read_index * d_sizeof_item];
+}
+
void buffer::update_write_pointer(int nitems)
{
gr::thread::scoped_lock guard(*mutex());
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
unsigned orig_wr_idx = d_write_index;
#endif
@@ -147,7 +138,6 @@ void buffer::update_write_pointer(int nitems)
d_abs_write_offset += nitems;
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx
<< " -- nitems: " << nitems << " -- d_write_index: " << d_write_index;
@@ -266,4 +256,21 @@ std::ostream& operator<<(std::ostream& os, const buffer& buf)
return os;
}
+void buffer::set_context(const buffer_context& context)
+{
+ if ((d_context == buffer_context::DEFAULT_INVALID) || (d_context == context)) {
+ // Set the context if the existing value is the default or if it is the
+ // same as what's already been set
+ d_context = context;
+ } else {
+ // Otherwise error out as the context value cannot be changed after
+ // it is set
+ std::ostringstream msg;
+ msg << "Block: " << link()->identifier() << " has context " << d_context
+ << " assigned. Cannot change to context " << context << ".";
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+}
+
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_context.cc b/gnuradio-runtime/lib/buffer_context.cc
new file mode 100644
index 0000000000..9cd27add36
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_context.cc
@@ -0,0 +1,32 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+#include <gnuradio/buffer_context.h>
+
+namespace gr {
+
+std::ostream& operator<<(std::ostream& os, const buffer_context& context)
+{
+ switch (context) {
+ case buffer_context::DEFAULT_INVALID:
+ return os << "DEFAULT_INVALID";
+ case buffer_context::HOST_TO_DEVICE:
+ return os << "HOST_TO_DEVICE";
+ case buffer_context::DEVICE_TO_HOST:
+ return os << "DEVICE_TO_HOST";
+ case buffer_context::HOST_TO_HOST:
+ return os << "HOST_TO_HOST";
+ case buffer_context::DEVICE_TO_DEVICE:
+ return os << "DEVICE_TO_DEVICE";
+ default:
+ return os << "Unknown buffer context: " << static_cast<int>(context);
+ }
+}
+
+} // namespace gr
diff --git a/gnuradio-runtime/lib/buffer_double_mapped.cc b/gnuradio-runtime/lib/buffer_double_mapped.cc
index ad99c2162b..700ebac0c1 100644
--- a/gnuradio-runtime/lib/buffer_double_mapped.cc
+++ b/gnuradio-runtime/lib/buffer_double_mapped.cc
@@ -36,23 +36,25 @@ static inline long minimum_buffer_items(long type_size, long page_size)
return page_size / GR_GCD(type_size, page_size);
}
+buffer_type buffer_double_mapped::type(buftype_DEFAULT_NON_CUSTOM{});
buffer_double_mapped::buffer_double_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link)
- : buffer(BufferMappingType::DoubleMapped,
+ : buffer(buffer_mapping_type::double_mapped,
nitems,
sizeof_item,
downstream_lcm_nitems,
+ downstream_max_out_mult,
link)
{
gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_double_mapped");
- if (!allocate_buffer(nitems, sizeof_item))
+ if (!allocate_buffer(nitems))
throw std::bad_alloc();
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
{
std::ostringstream msg;
msg << "[" << this << "] "
@@ -62,13 +64,18 @@ buffer_double_mapped::buffer_double_mapped(int nitems,
#endif
}
+// NB: Added the extra 'block_sptr unused' parameter so that the
+// call signature matches the other factory-like functions used to create
+// the buffer_single_mapped subclasses
buffer_sptr make_buffer_double_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
- block_sptr link)
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr unused)
{
- return buffer_sptr(
- new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
+ return buffer_sptr(new buffer_double_mapped(
+ nitems, sizeof_item, downstream_lcm_nitems, downstream_max_out_mult, link));
}
buffer_double_mapped::~buffer_double_mapped() {}
@@ -77,13 +84,13 @@ buffer_double_mapped::~buffer_double_mapped() {}
* sets d_vmcircbuf, d_base, d_bufsize.
* returns true iff successful.
*/
-bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
+bool buffer_double_mapped::allocate_buffer(int nitems)
{
int orig_nitems = nitems;
// Any buffer size we come up with must be a multiple of min_nitems.
int granularity = gr::vmcircbuf_sysconfig::granularity();
- int min_nitems = minimum_buffer_items(sizeof_item, granularity);
+ int min_nitems = minimum_buffer_items(d_sizeof_item, granularity);
// Round-up nitems to a multiple of min_nitems.
if (nitems % min_nitems != 0)
@@ -91,7 +98,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
// If we rounded-up a whole bunch, give the user a heads up.
// This only happens if sizeof_item is not a power of two.
- if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) {
+ if (nitems > 2 * orig_nitems && nitems * (int)d_sizeof_item > granularity) {
auto msg =
str(boost::format(
"allocate_buffer: tried to allocate"
@@ -99,7 +106,7 @@ bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
" %d were allocated. If this isn't OK, consider padding"
" your structure to a power-of-two bytes."
" On this platform, our allocation granularity is %d bytes.") %
- orig_nitems % sizeof_item % nitems % granularity);
+ orig_nitems % d_sizeof_item % nitems % granularity);
GR_LOG_WARN(d_logger, msg.c_str());
}
@@ -138,7 +145,6 @@ int buffer_double_mapped::space_available()
}
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << this << "] "
<< "space_available() called d_write_index: " << d_write_index
diff --git a/gnuradio-runtime/lib/buffer_reader.cc b/gnuradio-runtime/lib/buffer_reader.cc
index 7ead53032e..b1c1e7fb73 100644
--- a/gnuradio-runtime/lib/buffer_reader.cc
+++ b/gnuradio-runtime/lib/buffer_reader.cc
@@ -34,11 +34,11 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay
buffer_reader_sptr r;
- if (buf->get_mapping_type() == BufferMappingType::DoubleMapped) {
+ if (buf->get_mapping_type() == buffer_mapping_type::double_mapped) {
r.reset(new buffer_reader(
buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
r->declare_sample_delay(delay);
- } else if (buf->get_mapping_type() == BufferMappingType::SingleMapped) {
+ } else if (buf->get_mapping_type() == buffer_mapping_type::single_mapped) {
r.reset(new buffer_reader_sm(
buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
r->declare_sample_delay(delay);
@@ -51,11 +51,15 @@ buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay
buf->d_readers.push_back(r.get());
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
- std::cerr << " [" << buf.get() << ";" << r.get()
- << "] buffer_add_reader() nzero_preload " << nzero_preload
- << " -- delay: " << delay << " -- history: " << link->history()
- << " -- RD_idx: " << r->d_read_index << std::endl;
+ gr::logger_ptr logger;
+ gr::logger_ptr debug_logger;
+ gr::configure_default_loggers(logger, debug_logger, "buffer_add_reader");
+
+ std::ostringstream msg;
+ msg << " [" << buf.get() << ";" << r.get() << "] buffer_add_reader() nzero_preload "
+ << nzero_preload << " -- delay: " << delay << " -- history: " << link->history()
+ << " -- RD_idx: " << r->d_read_index;
+ GR_LOG_DEBUG(debug_logger, msg.str());
#endif
return r;
@@ -89,12 +93,11 @@ void buffer_reader::declare_sample_delay(unsigned delay)
unsigned buffer_reader::sample_delay() const { return d_attr_delay; }
-int buffer_reader::items_available() // const
+int buffer_reader::items_available() const
{
int available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << d_buffer << ";" << this << "] "
<< "items_available() WR_idx: " << d_buffer->d_write_index
@@ -109,7 +112,8 @@ int buffer_reader::items_available() // const
const void* buffer_reader::read_pointer()
{
- return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item];
+ // Delegate to buffer subclass
+ return d_buffer->_read_pointer(d_read_index);
}
void buffer_reader::update_read_pointer(int nitems)
@@ -117,7 +121,6 @@ void buffer_reader::update_read_pointer(int nitems)
gr::thread::scoped_lock guard(*mutex());
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
unsigned orig_rd_idx = d_read_index;
#endif
@@ -125,7 +128,6 @@ void buffer_reader::update_read_pointer(int nitems)
d_abs_read_offset += nitems;
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << d_buffer << ";" << this
<< "] update_read_pointer -- orig d_read_index: " << orig_rd_idx
diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc
index 9e0fac584f..c0d4b07d65 100644
--- a/gnuradio-runtime/lib/buffer_reader_sm.cc
+++ b/gnuradio-runtime/lib/buffer_reader_sm.cc
@@ -26,13 +26,14 @@ namespace gr {
buffer_reader_sm::~buffer_reader_sm() {}
-int buffer_reader_sm::items_available()
+int buffer_reader_sm::items_available() const
{
int available = 0;
if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
if (d_buffer->d_write_index == d_read_index) {
- if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+ if ((nitems_read() - sample_delay()) !=
+ (d_buffer->nitems_written() + link()->history() - 1)) {
available = d_buffer->d_bufsize - d_read_index;
}
} else {
@@ -55,87 +56,15 @@ int buffer_reader_sm::items_available()
bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const
{
- gr::thread::scoped_lock(*d_buffer->mutex());
-
- return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
- (d_buffer->d_write_index < d_read_index));
+ return d_buffer->input_blkd_cb_ready(items_required, d_read_index);
}
bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail)
{
- // Maybe adjust read pointers from min read index?
- // This would mean that *all* readers must be > (passed) the write index
- if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
- (d_buffer->d_write_index < d_read_index)) {
-
- // Update items available before going farther as it could be stale
- items_avail = items_available();
-
- // Find reader with the smallest read index that is greater than the
- // write index
- uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
- uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
- for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
- if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) {
- // Record index of reader with minimum read-index
- if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) {
- min_read_idx = d_buffer->d_readers[idx]->d_read_index;
- min_reader_index = idx;
- }
- }
- }
-
- // Note items_avail might be zero, that's okay.
- items_avail += d_read_index - min_read_idx;
- int gap = min_read_idx - d_buffer->d_write_index;
- if (items_avail > gap) {
- return false;
- }
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << d_buffer << ";" << this << "] "
- << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index
- << " -- WR items: " << d_buffer->nitems_written()
- << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx;
- for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
- if (idx != min_reader_index) {
- msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index;
- }
- }
-
- msg << " -- GAP: " << gap << " -- items_required: " << items_required
- << " -- items_avail: " << items_avail;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- // Shift existing data down to make room for blocked data at end of buffer
- uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item;
- char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item);
- std::memmove(dest, d_buffer->d_base, move_data_size);
-
- // Next copy the data from the end of the buffer back to the beginning
- uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item;
- char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item);
- std::memcpy(d_buffer->d_base, src, avail_data_size);
-
- // Now adjust write pointer
- d_buffer->d_write_index += items_avail;
-
- // Finally adjust all reader pointers
- for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
- if (idx == min_reader_index) {
- d_buffer->d_readers[idx]->d_read_index = 0;
- } else {
- d_buffer->d_readers[idx]->d_read_index += items_avail;
- d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize;
- }
- }
-
- return true;
- }
+ // Update items available before going farther as it could be stale
+ items_avail = items_available();
- return false;
+ return d_buffer->input_blocked_callback(items_required, items_avail, d_read_index);
}
buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer,
diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc
index 6024396557..db7959c5b4 100644
--- a/gnuradio-runtime/lib/buffer_single_mapped.cc
+++ b/gnuradio-runtime/lib/buffer_single_mapped.cc
@@ -19,6 +19,8 @@
#include <gnuradio/thread/thread.h>
#include <assert.h>
#include <algorithm>
+#include <cstdlib>
+#include <cstring>
#include <iostream>
#include <stdexcept>
@@ -27,30 +29,18 @@ namespace gr {
buffer_single_mapped::buffer_single_mapped(int nitems,
size_t sizeof_item,
uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
block_sptr link,
block_sptr buf_owner)
- : buffer(BufferMappingType::SingleMapped,
+ : buffer(buffer_mapping_type::single_mapped,
nitems,
sizeof_item,
downstream_lcm_nitems,
+ downstream_max_out_mult,
link),
d_buf_owner(buf_owner),
- d_buffer(nullptr,
- std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1))
+ d_buffer(nullptr)
{
- gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped");
- if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems))
- throw std::bad_alloc();
-
-#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
- {
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "buffer_single_mapped constructor -- history: " << link->history();
- GR_LOG_DEBUG(d_logger, msg.str());
- }
-#endif
}
buffer_single_mapped::~buffer_single_mapped() {}
@@ -59,14 +49,21 @@ buffer_single_mapped::~buffer_single_mapped() {}
* Allocates underlying buffer.
* returns true iff successful.
*/
-bool buffer_single_mapped::allocate_buffer(int nitems,
- size_t sizeof_item,
- uint64_t downstream_lcm_nitems)
+bool buffer_single_mapped::allocate_buffer(int nitems)
{
#ifdef BUFFER_DEBUG
int orig_nitems = nitems;
#endif
+ // For single mapped buffers resize the initial size to be at least four
+ // times the size of the largest of any downstream block's output multiple.
+ // This helps reduce the number of times the input_block_callback() might be
+ // called which should help overall performance (particularly if the max
+ // output multiple is large) at the cost of slightly more buffer space.
+ if (static_cast<uint32_t>(nitems) < (4 * d_max_reader_output_multiple)) {
+ nitems = 4 * d_max_reader_output_multiple;
+ }
+
// Unlike the double mapped buffer case that can easily wrap back onto itself
// for both reads and writes the single mapped case needs to be aware of read
// and write granularity and size the underlying buffer accordingly. Otherwise
@@ -105,33 +102,37 @@ bool buffer_single_mapped::allocate_buffer(int nitems,
#endif
// Adjust size so output buffer size is a multiple of the write granularity
- if (write_granularity != 1 || downstream_lcm_nitems != 1) {
- uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems);
+ if (write_granularity != 1 || d_downstream_lcm_nitems != 1) {
+ uint64_t size_align_adjust = GR_LCM(write_granularity, d_downstream_lcm_nitems);
uint64_t remainder = nitems % size_align_adjust;
nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0;
#ifdef BUFFER_DEBUG
std::ostringstream msg;
msg << "allocate_buffer()** called nitems: " << orig_nitems
- << " -- read_multiple: " << downstream_lcm_nitems
+ << " -- read_multiple: " << d_downstream_lcm_nitems
<< " -- write_multiple: " << write_granularity
<< " -- NEW nitems: " << nitems;
GR_LOG_DEBUG(d_logger, msg.str());
#endif
}
- // Allocate a new custom buffer from the owning block
- char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item);
- assert(buf != nullptr);
- d_buffer.reset(buf);
-
- d_base = d_buffer.get();
d_bufsize = nitems;
- d_downstream_lcm_nitems = downstream_lcm_nitems;
+ d_downstream_lcm_nitems = d_downstream_lcm_nitems;
d_write_multiple = write_granularity;
- return true;
+ // Do the actual allocation(s) with the finalized nitems
+ return do_allocate_buffer(nitems, d_sizeof_item);
+}
+
+bool buffer_single_mapped::input_blkd_cb_ready(int items_required,
+ unsigned int read_index)
+{
+ gr::thread::scoped_lock(*this->mutex());
+
+ return (((d_bufsize - read_index) < (uint32_t)items_required) &&
+ (d_write_index < read_index));
}
bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
@@ -145,70 +146,6 @@ bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
((space_avail / output_multiple) * output_multiple == 0));
}
-
-bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force)
-{
- uint32_t space_avail = space_available();
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "output_blocked_callback()*** WR_idx: " << d_write_index
- << " -- WR items: " << nitems_written()
- << " -- output_multiple: " << output_multiple
- << " -- space_avail: " << space_avail << " -- force: " << force;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
- force) {
- // Find reader with the smallest read index
- uint32_t min_read_idx = d_readers[0]->d_read_index;
- for (size_t idx = 1; idx < d_readers.size(); ++idx) {
- // Record index of reader with minimum read-index
- if (d_readers[idx]->d_read_index < min_read_idx) {
- min_read_idx = d_readers[idx]->d_read_index;
- }
- }
-
-#ifdef BUFFER_DEBUG
- std::ostringstream msg;
- msg << "[" << this << "] "
- << "output_blocked_callback() WR_idx: " << d_write_index
- << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
- << " -- shortcircuit: "
- << ((min_read_idx == 0) || (min_read_idx >= d_write_index))
- << " -- to_move_items: " << (d_write_index - min_read_idx)
- << " -- space_avail: " << space_avail << " -- force: " << force;
- GR_LOG_DEBUG(d_logger, msg.str());
-#endif
-
- // Make sure we have enough room to start writing back at the beginning
- if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) {
- return false;
- }
-
- // Determine how much "to be read" data needs to be moved
- int to_move_items = d_write_index - min_read_idx;
- assert(to_move_items > 0);
- uint32_t to_move_bytes = to_move_items * d_sizeof_item;
-
- // Shift "to be read" data back to the beginning of the buffer
- std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes);
-
- // Adjust write index and each reader index
- d_write_index -= min_read_idx;
-
- for (size_t idx = 0; idx < d_readers.size(); ++idx) {
- d_readers[idx]->d_read_index -= min_read_idx;
- }
-
- return true;
- }
-
- return false;
-}
-
int buffer_single_mapped::space_available()
{
if (d_readers.empty())
@@ -233,7 +170,7 @@ int buffer_single_mapped::space_available()
// For single mapped buffer there is no wrapping beyond the end of the
// buffer
#ifdef BUFFER_DEBUG
- int thecase = 4; // REMOVE ME - just for debug
+ int thecase = 0;
#endif
int space = d_bufsize - d_write_index;
@@ -259,15 +196,17 @@ int buffer_single_mapped::space_available()
#endif
space = min_read_index - d_write_index;
// Leave extra space in case the reader gets stuck and needs realignment
- {
- if ((d_write_index > (d_bufsize / 2)) ||
- (min_read_index < (d_bufsize / 2))) {
+ if (d_max_reader_output_multiple > 1) {
+ if (static_cast<uint32_t>(space) > d_max_reader_output_multiple) {
#ifdef BUFFER_DEBUG
- thecase = 17;
+ thecase = 4;
#endif
- space = 0;
+ space = space - d_max_reader_output_multiple;
} else {
- space = (d_bufsize / 2) - d_write_index;
+#ifdef BUFFER_DEBUG
+ thecase = 5;
+#endif
+ space = 0;
}
}
}
@@ -278,7 +217,6 @@ int buffer_single_mapped::space_available()
}
#ifdef BUFFER_DEBUG
- // BUFFER DEBUG
std::ostringstream msg;
msg << "[" << this << "] "
<< "space_available() called (case: " << thecase
@@ -286,6 +224,7 @@ int buffer_single_mapped::space_available()
<< " -- min_read_index: " << min_read_index << " ("
<< min_idx_reader->nitems_read() << ") "
<< " -- space: " << space
+ << " -- max_reader_out_mult: " << d_max_reader_output_multiple
<< " (sample delay: " << min_idx_reader->sample_delay() << ")";
GR_LOG_DEBUG(d_logger, msg.str());
#endif
@@ -294,4 +233,198 @@ int buffer_single_mapped::space_available()
}
}
+void buffer_single_mapped::update_reader_block_history(unsigned history, int delay)
+{
+ unsigned old_max = d_max_reader_history;
+ d_max_reader_history = std::max(d_max_reader_history, history);
+ if (d_max_reader_history != old_max) {
+ d_write_index = d_max_reader_history - 1;
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "buffer_single_mapped constructor -- set wr index to: " << d_write_index;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Reset the reader's read index if the buffer's write index has changed.
+ // Note that "history - 1" is the nzero_preload value passed to
+ // buffer_add_reader.
+ for (auto reader : d_readers) {
+ reader->d_read_index = d_write_index - (reader->link()->history() - 1);
+ }
+ }
+
+ // Only attempt to set has history flag if it is not already set
+ if (!d_has_history) {
+ // Blocks that set delay may set history to delay + 1 but this is
+ // not "real" history
+ d_has_history = ((static_cast<int>(history) - 1) != delay);
+ }
+}
+
+//------------------------------------------------------------------------------
+
+bool buffer_single_mapped::input_blocked_callback_logic(int items_required,
+ int items_avail,
+ unsigned read_index,
+ char* buffer_ptr,
+ memcpy_func_t memcpy_func,
+ memmove_func_t memmove_func)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize
+ << " -- RD_idx: " << read_index << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Maybe adjust read pointers from min read index?
+ // This would mean that *all* readers must be > (passed) the write index
+ if (((d_bufsize - read_index) < (uint32_t)items_required) &&
+ (d_write_index < read_index)) {
+
+ // Find reader with the smallest read index that is greater than the
+ // write index
+ uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
+ uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (d_readers[idx]->d_read_index > d_write_index) {
+ // Record index of reader with minimum read-index
+ if (d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_readers[idx]->d_read_index;
+ min_reader_index = idx;
+ }
+ }
+ }
+
+ // Note items_avail might be zero, that's okay.
+ items_avail += read_index - min_read_idx;
+ int gap = min_read_idx - d_write_index;
+ if (items_avail > gap) {
+ return false;
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- BUFSIZE: " << d_bufsize
+ << " -- RD_idx: " << min_read_idx;
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (idx != min_reader_index) {
+ msg << " -- OTHER_RDR: " << d_readers[idx]->d_read_index;
+ }
+ }
+
+ msg << " -- GAP: " << gap << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Shift existing data down to make room for blocked data at end of buffer
+ uint32_t move_data_size = d_write_index * d_sizeof_item;
+ char* dest = buffer_ptr + (items_avail * d_sizeof_item);
+ memmove_func(dest, buffer_ptr, move_data_size);
+
+ // Next copy the data from the end of the buffer back to the beginning
+ uint32_t avail_data_size = items_avail * d_sizeof_item;
+ char* src = buffer_ptr + (min_read_idx * d_sizeof_item);
+ memcpy_func(buffer_ptr, src, avail_data_size);
+
+ // Now adjust write pointer
+ d_write_index += items_avail;
+
+ // Finally adjust all reader pointers
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ if (idx == min_reader_index) {
+ d_readers[idx]->d_read_index = 0;
+ } else {
+ d_readers[idx]->d_read_index += items_avail;
+ d_readers[idx]->d_read_index %= d_bufsize;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+bool buffer_single_mapped::output_blocked_callback_logic(int output_multiple,
+ bool force,
+ char* buffer_ptr,
+ memmove_func_t memmove_func)
+{
+ uint32_t space_avail = space_available();
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "output_blocked_callback()*** WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written()
+ << " -- output_multiple: " << output_multiple
+ << " -- space_avail: " << space_avail << " -- force: " << force;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
+ force) {
+ // Find reader with the smallest read index
+ uint32_t min_read_idx = d_readers[0]->d_read_index;
+ uint64_t min_read_idx_nitems = d_readers[0]->nitems_read();
+ for (size_t idx = 1; idx < d_readers.size(); ++idx) {
+ // Record index of reader with minimum read-index
+ if (d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_readers[idx]->d_read_index;
+ min_read_idx_nitems = d_readers[idx]->nitems_read();
+ }
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "output_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
+ << " -- RD items: " << min_read_idx_nitems << " -- shortcircuit: "
+ << ((min_read_idx == 0) || (min_read_idx > d_write_index) ||
+ (min_read_idx == d_write_index &&
+ min_read_idx_nitems != nitems_written()))
+ << " -- to_move_items: " << (d_write_index - min_read_idx)
+ << " -- space_avail: " << space_avail << " -- force: " << force;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Make sure we have enough room to start writing back at the beginning
+ if ((min_read_idx == 0) || (min_read_idx > d_write_index) ||
+ (min_read_idx == d_write_index && min_read_idx_nitems != nitems_written())) {
+ return false;
+ }
+
+ // Determine how much "to be read" data needs to be moved
+ int to_move_items = d_write_index - min_read_idx;
+ if (to_move_items > 0) {
+ uint32_t to_move_bytes = to_move_items * d_sizeof_item;
+
+ // Shift "to be read" data back to the beginning of the buffer
+ memmove_func(
+ buffer_ptr, buffer_ptr + (min_read_idx * d_sizeof_item), to_move_bytes);
+ }
+
+ // Adjust write index and each reader index
+ d_write_index -= min_read_idx;
+
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ d_readers[idx]->d_read_index -= min_read_idx;
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_type.cc b/gnuradio-runtime/lib/buffer_type.cc
deleted file mode 100644
index b667eded1e..0000000000
--- a/gnuradio-runtime/lib/buffer_type.cc
+++ /dev/null
@@ -1,18 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2020 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-#include <gnuradio/buffer_type.h>
-
-namespace gr {
-
-uint32_t buffer_type_base::s_nextId = 0;
-std::mutex buffer_type_base::s_mutex;
-
-
-} /* namespace gr */ \ No newline at end of file
diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc
index b9986f8370..ef96ef21ac 100644
--- a/gnuradio-runtime/lib/flat_flowgraph.cc
+++ b/gnuradio-runtime/lib/flat_flowgraph.cc
@@ -15,6 +15,7 @@
#include "flat_flowgraph.h"
#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_double_mapped.h>
#include <gnuradio/buffer_reader.h>
#include <gnuradio/buffer_type.h>
#include <gnuradio/integer_math.h>
@@ -87,6 +88,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
// Determine the downstream max per output port
std::vector<int> downstream_max_nitems(noutputs, 0);
std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1);
+ std::vector<uint32_t> downstream_max_out_mult(noutputs, 1);
#ifdef BUFFER_DEBUG
std::ostringstream msg;
@@ -96,6 +98,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
for (int i = 0; i < noutputs; i++) {
int nitems = 0;
uint64_t lcm_nitems = 1;
+ uint32_t max_out_multiple = 1;
basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i);
for (basic_block_viter_t blk = downstream_blocks.begin();
blk != downstream_blocks.end();
@@ -116,8 +119,8 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
double decimation = (1.0 / dgrblock->relative_rate());
int multiple = dgrblock->output_multiple();
int history = dgrblock->history();
- nitems =
- std::max(nitems, static_cast<int>(2 * (decimation * multiple + history)));
+ nitems = std::max(
+ nitems, static_cast<int>(2 * (decimation * multiple + (history - 1))));
// Calculate the LCM of downstream reader nitems
#ifdef BUFFER_DEBUG
@@ -131,6 +134,7 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
(uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) -
(dgrblock->history() - 1)));
}
+
if (dgrblock->relative_rate() != 1.0) {
// Relative rate
lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d());
@@ -141,6 +145,10 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
lcm_nitems = 1;
}
+ if (static_cast<uint32_t>(multiple) > max_out_multiple) {
+ max_out_multiple = multiple;
+ }
+
#ifdef BUFFER_DEBUG
msg.str("");
msg << " NINPUT_ITEMS: " << nitems;
@@ -161,11 +169,15 @@ void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
}
downstream_max_nitems[i] = nitems;
downstream_lcm_nitems[i] = lcm_nitems;
+ downstream_max_out_mult[i] = max_out_multiple;
}
// Allocate the block detail and necessary buffers
- grblock->allocate_detail(
- ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems);
+ grblock->allocate_detail(ninputs,
+ noutputs,
+ downstream_max_nitems,
+ downstream_lcm_nitems,
+ downstream_max_out_mult);
}
void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
@@ -189,26 +201,46 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
if (!src_grblock)
throw std::runtime_error("connect_block_inputs found non-gr::block");
+ // In order to determine the buffer context, we need to examine both
+ // the upstream and the downstream buffer_types
+ buffer_type src_buf_type =
+ src_grblock->output_signature()->stream_buffer_type(src_port);
+ buffer_type dest_buf_type =
+ grblock->input_signature()->stream_buffer_type(dst_port);
+
+ buffer_context context;
+ if (src_buf_type == buffer_double_mapped::type &&
+ dest_buf_type == buffer_double_mapped::type) {
+ context = buffer_context::HOST_TO_HOST;
+ } else if (src_buf_type != buffer_double_mapped::type &&
+ dest_buf_type == buffer_double_mapped::type) {
+ context = buffer_context::DEVICE_TO_HOST;
+ } else if (src_buf_type == buffer_double_mapped::type &&
+ dest_buf_type != buffer_double_mapped::type) {
+ context = buffer_context::HOST_TO_DEVICE;
+ } else if (src_buf_type != buffer_double_mapped::type &&
+ dest_buf_type != buffer_double_mapped::type) {
+ context = buffer_context::DEVICE_TO_DEVICE;
+ }
+
buffer_sptr src_buffer;
- buffer_type_t src_buf_type = src_grblock->get_buffer_type();
- buffer_type_t dest_buf_type = grblock->get_buffer_type();
- if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() ||
+ if (dest_buf_type == buffer_double_mapped::type ||
dest_buf_type == src_buf_type) {
// The block is not using a custom buffer OR the block and the upstream
// block both use the same kind of custom buffer
src_buffer = src_grblock->detail()->output(src_port);
} else {
- if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() &&
- src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) {
+ if (dest_buf_type != buffer_double_mapped::type &&
+ src_buf_type == buffer_double_mapped::type) {
// The block uses a custom buffer but the upstream block does not
// therefore the upstream block's buffer can be replaced with the
// type of buffer that the block needs
std::ostringstream msg;
msg << "Block: " << grblock->identifier()
- << "replacing upstream block: " << src_grblock->identifier()
+ << " replacing upstream block: " << src_grblock->identifier()
<< " buffer with a custom buffer";
- GR_LOG_DEBUG(d_debug_logger, msg.str());
- src_buffer = src_grblock->replace_buffer(src_port, grblock);
+ GR_LOG_DEBUG(d_logger, msg.str());
+ src_buffer = src_grblock->replace_buffer(src_port, dst_port, grblock);
} else {
// Both the block and upstream block use incompatible buffer types
// which is not currently allowed
@@ -223,9 +255,13 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
}
}
- GR_LOG_DEBUG(d_debug_logger,
- "Setting input " + std::to_string(dst_port) + " from edge " +
- (*e).identifier());
+ // Set buffer's context
+ src_buffer->set_context(context);
+
+ std::ostringstream msg;
+ msg << "Setting input " << dst_port << " from edge " << (*e).identifier()
+ << " context: " << context;
+ GR_LOG_DEBUG(d_debug_logger, msg.str());
detail->set_input(dst_port,
buffer_add_reader(src_buffer,
@@ -341,7 +377,6 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg)
// Now deal with the fact that the block details might have
// changed numbers of inputs and outputs vs. in the old
// flowgraph.
-
block->detail()->reset_nitem_counters();
block->detail()->clear_tags();
}
diff --git a/gnuradio-runtime/lib/host_buffer.cc b/gnuradio-runtime/lib/host_buffer.cc
new file mode 100644
index 0000000000..6e1a1d1003
--- /dev/null
+++ b/gnuradio-runtime/lib/host_buffer.cc
@@ -0,0 +1,255 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx Inc..
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ */
+
+#include <cstring>
+#include <sstream>
+#include <stdexcept>
+
+#include <gnuradio/block.h>
+#include <gnuradio/host_buffer.h>
+
+namespace gr {
+
+buffer_type host_buffer::type(buftype_HOST_BUFFER{});
+
+void* host_buffer::device_memcpy(void* dest, const void* src, std::size_t count)
+{
+ // There is no spoon...er... device so fake it out using regular memcpy
+ return std::memcpy(dest, src, count);
+}
+
+void* host_buffer::device_memmove(void* dest, const void* src, std::size_t count)
+{
+ // There is no spoon...er... device so fake it out using regular memmmove
+ return std::memmove(dest, src, count);
+}
+
+
+host_buffer::host_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner)
+ : buffer_single_mapped(nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ link,
+ buf_owner),
+ d_device_base(nullptr)
+{
+ gr::configure_default_loggers(d_logger, d_debug_logger, "host_buffer");
+ if (!allocate_buffer(nitems))
+ throw std::bad_alloc();
+}
+
+host_buffer::~host_buffer() {}
+
+void host_buffer::post_work(int nitems)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "host_buffer [" << d_context << "] -- post_work: " << nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ if (nitems <= 0) {
+ return;
+ }
+
+ // NOTE: when this function is called the write pointer has not yet been
+ // advanced so it can be used directly as the source ptr
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE: {
+ // Copy data from host buffer to device buffer
+ void* dest_ptr = &d_device_base[d_write_index * d_sizeof_item];
+ device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item);
+ } break;
+
+ case buffer_context::DEVICE_TO_HOST: {
+ // Copy data from device buffer to host buffer
+ void* dest_ptr = &d_base[d_write_index * d_sizeof_item];
+ device_memcpy(dest_ptr, write_pointer(), nitems * d_sizeof_item);
+ } break;
+
+ case buffer_context::DEVICE_TO_DEVICE:
+ // No op
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+}
+
+bool host_buffer::do_allocate_buffer(size_t final_nitems, size_t sizeof_item)
+{
+#ifdef BUFFER_DEBUG
+ {
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "host_buffer constructor -- nitems: " << final_nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
+ }
+#endif
+
+ // This is the host buffer
+ d_buffer.reset(new char[final_nitems * sizeof_item]());
+ d_base = d_buffer.get();
+
+ // This is the simulated device buffer
+ d_device_buf.reset(new char[final_nitems * sizeof_item]());
+ d_device_base = d_device_buf.get();
+
+ return true;
+}
+
+void* host_buffer::write_pointer()
+{
+ void* ptr = nullptr;
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE:
+ // Write into host buffer
+ ptr = &d_base[d_write_index * d_sizeof_item];
+ break;
+
+ case buffer_context::DEVICE_TO_HOST:
+ case buffer_context::DEVICE_TO_DEVICE:
+ // Write into "device" buffer
+ ptr = &d_device_base[d_write_index * d_sizeof_item];
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+
+ return ptr;
+}
+
+const void* host_buffer::_read_pointer(unsigned int read_index)
+{
+ void* ptr = nullptr;
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE:
+ case buffer_context::DEVICE_TO_DEVICE:
+ // Read from "device" buffer
+ ptr = &d_device_base[read_index * d_sizeof_item];
+ break;
+
+ case buffer_context::DEVICE_TO_HOST:
+ // Read from host buffer
+ ptr = &d_base[read_index * d_sizeof_item];
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+
+ return ptr;
+}
+
+bool host_buffer::input_blocked_callback(int items_required,
+ int items_avail,
+ unsigned read_index)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "host_buffer [" << d_context << "] -- input_blocked_callback";
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ bool rc = false;
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE:
+ case buffer_context::DEVICE_TO_DEVICE:
+ // Adjust "device" buffer
+ rc = input_blocked_callback_logic(items_required,
+ items_avail,
+ read_index,
+ d_device_base,
+ host_buffer::device_memcpy,
+ host_buffer::device_memmove);
+ break;
+
+ case buffer_context::DEVICE_TO_HOST:
+ case buffer_context::HOST_TO_HOST:
+ // Adjust host buffer
+ rc = input_blocked_callback_logic(
+ items_required, items_avail, read_index, d_base, std::memcpy, std::memmove);
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+
+ return rc;
+}
+
+bool host_buffer::output_blocked_callback(int output_multiple, bool force)
+{
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "host_buffer [" << d_context << "] -- output_blocked_callback";
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ bool rc = false;
+ switch (d_context) {
+ case buffer_context::HOST_TO_DEVICE:
+ case buffer_context::HOST_TO_HOST:
+ // Adjust host buffer
+ rc = output_blocked_callback_logic(output_multiple, force, d_base, std::memmove);
+ break;
+
+ case buffer_context::DEVICE_TO_HOST:
+ case buffer_context::DEVICE_TO_DEVICE:
+ // Adjust "device" buffer
+ rc = output_blocked_callback_logic(
+ output_multiple, force, d_device_base, host_buffer::device_memmove);
+ break;
+
+ default:
+ std::ostringstream msg;
+ msg << "Unexpected context for host_buffer: " << d_context;
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+
+ return rc;
+}
+
+buffer_sptr host_buffer::make_host_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ uint32_t downstream_max_out_mult,
+ block_sptr link,
+ block_sptr buf_owner)
+{
+ return buffer_sptr(new host_buffer(nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ downstream_max_out_mult,
+ link,
+ buf_owner));
+}
+
+} /* namespace gr */
diff --git a/gnuradio-runtime/lib/io_signature.cc b/gnuradio-runtime/lib/io_signature.cc
index 507268d5ce..e953c7eb5a 100644
--- a/gnuradio-runtime/lib/io_signature.cc
+++ b/gnuradio-runtime/lib/io_signature.cc
@@ -22,47 +22,69 @@ gr::io_signature::sptr io_signature::makev(int min_streams,
int max_streams,
const std::vector<int>& sizeof_stream_items)
{
+ gr_vector_buffer_type buftypes(sizeof_stream_items.size(),
+ buffer_double_mapped::type);
return gr::io_signature::sptr(
- new io_signature(min_streams, max_streams, sizeof_stream_items));
+ new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes));
}
-gr::io_signature::sptr
-io_signature::make(int min_streams, int max_streams, int sizeof_stream_item)
+gr::io_signature::sptr io_signature::makev(int min_streams,
+ int max_streams,
+ const std::vector<int>& sizeof_stream_items,
+ gr_vector_buffer_type buftypes)
{
- std::vector<int> sizeof_items(1);
- sizeof_items[0] = sizeof_stream_item;
- return io_signature::makev(min_streams, max_streams, sizeof_items);
+ return gr::io_signature::sptr(
+ new io_signature(min_streams, max_streams, sizeof_stream_items, buftypes));
+}
+
+gr::io_signature::sptr io_signature::make(int min_streams,
+ int max_streams,
+ int sizeof_stream_item,
+ buffer_type buftype)
+{
+ std::vector<int> sizeof_items{ sizeof_stream_item };
+ gr_vector_buffer_type buftypes{ buftype };
+ return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
}
gr::io_signature::sptr io_signature::make2(int min_streams,
int max_streams,
int sizeof_stream_item1,
- int sizeof_stream_item2)
+ int sizeof_stream_item2,
+ buffer_type buftype1,
+ buffer_type buftype2)
{
- std::vector<int> sizeof_items(2);
- sizeof_items[0] = sizeof_stream_item1;
- sizeof_items[1] = sizeof_stream_item2;
- return io_signature::makev(min_streams, max_streams, sizeof_items);
+ std::vector<int> sizeof_items{ sizeof_stream_item1, sizeof_stream_item2 };
+ gr_vector_buffer_type buftypes{ buftype1, buftype2 };
+ return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
}
gr::io_signature::sptr io_signature::make3(int min_streams,
int max_streams,
int sizeof_stream_item1,
int sizeof_stream_item2,
- int sizeof_stream_item3)
+ int sizeof_stream_item3,
+ buffer_type buftype1,
+ buffer_type buftype2,
+ buffer_type buftype3)
{
- std::vector<int> sizeof_items(3);
- sizeof_items[0] = sizeof_stream_item1;
- sizeof_items[1] = sizeof_stream_item2;
- sizeof_items[2] = sizeof_stream_item3;
- return io_signature::makev(min_streams, max_streams, sizeof_items);
+ std::vector<int> sizeof_items{ sizeof_stream_item1,
+ sizeof_stream_item2,
+ sizeof_stream_item3 };
+ gr_vector_buffer_type buftypes{ buftype1, buftype2, buftype3 };
+ return io_signature::makev(min_streams, max_streams, sizeof_items, buftypes);
}
// ------------------------------------------------------------------------
io_signature::io_signature(int min_streams,
int max_streams,
- const std::vector<int>& sizeof_stream_items)
+ const std::vector<int>& sizeof_stream_items,
+ gr_vector_buffer_type buftypes)
+ : d_min_streams(min_streams),
+ d_max_streams(max_streams),
+ d_sizeof_stream_item(sizeof_stream_items),
+ d_stream_buffer_type(buftypes)
{
if (min_streams < 0 || (max_streams != IO_INFINITE && max_streams < min_streams))
throw std::invalid_argument("gr::io_signature(1)");
@@ -75,10 +97,6 @@ io_signature::io_signature(int min_streams,
if (max_streams != 0 && sizeof_stream_items[i] < 1)
throw std::invalid_argument("gr::io_signature(3)");
}
-
- d_min_streams = min_streams;
- d_max_streams = max_streams;
- d_sizeof_stream_item = sizeof_stream_items;
}
io_signature::~io_signature() {}
@@ -97,4 +115,14 @@ std::vector<int> io_signature::sizeof_stream_items() const
return d_sizeof_stream_item;
}
+buffer_type io_signature::stream_buffer_type(size_t index) const
+{
+ return d_stream_buffer_type[std::min(index, d_stream_buffer_type.size() - 1)];
+}
+
+gr_vector_buffer_type io_signature::stream_buffer_types() const
+{
+ return d_stream_buffer_type;
+}
+
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/qa_buffer.cc b/gnuradio-runtime/lib/qa_buffer.cc
index cefe548338..1c97fd070d 100644
--- a/gnuradio-runtime/lib/qa_buffer.cc
+++ b/gnuradio-runtime/lib/qa_buffer.cc
@@ -43,7 +43,7 @@ static void t0_body()
int counter = 0;
gr::buffer_sptr buf(
- gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
int last_sa;
int sa;
@@ -78,7 +78,7 @@ static void t1_body()
int read_counter = 0;
gr::buffer_sptr buf(
- gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr()));
int sa;
@@ -150,7 +150,7 @@ static void t2_body()
int nitems = (64 * (1L << 10)) / sizeof(int); // 64K worth of ints
gr::buffer_sptr buf(
- gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr()));
int read_counter = 0;
@@ -216,7 +216,7 @@ static void t3_body()
static const int N = 5;
gr::buffer_sptr buf(
- gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, 1, gr::block_sptr()));
gr::buffer_reader_sptr reader[N];
int read_counter[N];
int write_counter = 0;
diff --git a/gnuradio-runtime/lib/qa_host_buffer.cc b/gnuradio-runtime/lib/qa_host_buffer.cc
new file mode 100644
index 0000000000..e8f29afe8d
--- /dev/null
+++ b/gnuradio-runtime/lib/qa_host_buffer.cc
@@ -0,0 +1,334 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 BlackLynx Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gnuradio/block.h>
+#include <gnuradio/buffer_context.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/host_buffer.h>
+#include <gnuradio/random.h>
+#include <boost/test/unit_test.hpp>
+#include <cstdlib>
+#include <iostream>
+
+// This is a trivial mocked up block inspired by gr::blocks::nop that is used
+// only as a placeholder for testing host_buffer below.
+class nop : public gr::block
+{
+public:
+ typedef std::shared_ptr<nop> sptr;
+ static sptr make(size_t sizeof_stream_item)
+ {
+ return gnuradio::make_block_sptr<nop>(sizeof_stream_item);
+ }
+
+ nop(size_t sizeof_stream_item)
+ : block("nop",
+ gr::io_signature::make(0, -1, sizeof_stream_item),
+ gr::io_signature::make(0, -1, sizeof_stream_item))
+ {
+ }
+
+ ~nop() override {}
+
+ int general_work(int noutput_items,
+ gr_vector_int& ninput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items) override
+ {
+ // eat any input that's available
+ for (unsigned i = 0; i < ninput_items.size(); i++)
+ consume(i, ninput_items[i]);
+
+ return noutput_items;
+ }
+};
+
+
+// ----------------------------------------------------------------------------
+// Basic checks for buffer_single_mapped using the host_buffer implementation
+// of the interface for testing.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t0)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ BOOST_CHECK(buf->space_available() == nitems);
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ buf->update_write_pointer(1000);
+ BOOST_CHECK(buf->space_available() == (nitems - (idx * 1000)));
+
+ BOOST_CHECK(rdr1->items_available() == (idx * 1000));
+ }
+
+ BOOST_CHECK(buf->space_available() == 384);
+
+ buf->update_write_pointer(buf->space_available());
+ BOOST_CHECK(buf->space_available() == 0);
+ BOOST_CHECK(rdr1->items_available() == nitems);
+ BOOST_CHECK(buf->space_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic checks for buffer_single_mapped using the host_buffer implementation
+// of the interface for testing.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t1)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ int space = buf->space_available();
+ BOOST_CHECK(nitems == space);
+
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(nitems);
+ BOOST_CHECK(buf->space_available() == 0);
+ BOOST_CHECK(rdr1->items_available() == nitems);
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ rdr1->update_read_pointer(1000);
+ BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+ space = buf->space_available();
+ BOOST_CHECK(space == (idx * 1000));
+ }
+
+ BOOST_CHECK(rdr1->items_available() == 384);
+ rdr1->update_read_pointer(384);
+ BOOST_CHECK(rdr1->items_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic check reader/write wrapping of buffer_single_mapped with 1 reader.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t2)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ int space = buf->space_available();
+ BOOST_CHECK(nitems == space);
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(nitems);
+ BOOST_CHECK(buf->space_available() == 0);
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ rdr1->update_read_pointer(1000);
+ BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+ space = buf->space_available();
+
+ if (idx <= 9)
+ BOOST_CHECK(space == (idx * 1000));
+ else
+ BOOST_CHECK(space == ((idx * 1000) - (nitems / 2)));
+
+ if (idx == 9) {
+ buf->update_write_pointer(nitems / 2);
+ }
+ }
+
+ // At this point we can only read up until the end of the buffer even though
+ // additional data is available at the beginning of the buffer
+ BOOST_CHECK(rdr1->items_available() == 384);
+ rdr1->update_read_pointer(384);
+
+ // Now the (nitems / 2) at the beginning of the buffer should be available
+ BOOST_CHECK(rdr1->items_available() == (nitems / 2));
+
+ for (int idx = 0; idx < 4; ++idx)
+ rdr1->update_read_pointer(1024);
+
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+ BOOST_CHECK(rdr1->items_available() == (nitems / 4));
+
+ for (int idx = 0; idx < 4; ++idx)
+ rdr1->update_read_pointer(1000);
+
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+ BOOST_CHECK(rdr1->items_available() == 96);
+
+ rdr1->update_read_pointer(96);
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+}
+
+// ----------------------------------------------------------------------------
+// Basic check reader/write wrapping of buffer_single_mapped with 2 readers.
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t3)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+ gr::buffer_reader_sptr rdr2(gr::buffer_add_reader(buf, 0, nop));
+
+ int space = buf->space_available();
+ BOOST_CHECK(nitems == space);
+ BOOST_CHECK(rdr1->items_available() == 0);
+ BOOST_CHECK(rdr2->items_available() == 0);
+
+ buf->update_write_pointer(nitems);
+ BOOST_CHECK(buf->space_available() == 0);
+ BOOST_CHECK(rdr1->items_available() == nitems);
+ BOOST_CHECK(rdr2->items_available() == nitems);
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ rdr1->update_read_pointer(1000);
+ BOOST_CHECK(rdr1->items_available() == (nitems - (idx * 1000)));
+
+ // Reader 2 hasn't read anything so space available should remain 0
+ BOOST_CHECK(buf->space_available() == 0);
+ }
+
+ int last_rdr1_available = rdr1->items_available();
+ int increment = last_rdr1_available / 4;
+
+ for (int idx = 1; idx <= 16; ++idx) {
+ rdr2->update_read_pointer(1000);
+ BOOST_CHECK(rdr2->items_available() == (nitems - (idx * 1000)));
+
+ BOOST_CHECK(rdr1->items_available() == last_rdr1_available);
+ if (idx % 4 == 0) {
+ rdr1->update_read_pointer(increment);
+ BOOST_CHECK(rdr1->items_available() == (last_rdr1_available - increment));
+ last_rdr1_available = rdr1->items_available();
+ }
+
+ BOOST_CHECK(buf->space_available() == (idx * 1000));
+ }
+}
+
+// ----------------------------------------------------------------------------
+// Basic check of output blocked callback
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t4)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ BOOST_CHECK(nitems == buf->space_available());
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(nitems / 2);
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+ BOOST_CHECK(rdr1->items_available() == (nitems / 2));
+
+ rdr1->update_read_pointer(nitems / 2);
+ BOOST_CHECK(buf->space_available() == (nitems / 2));
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(8000);
+ BOOST_CHECK(buf->space_available() == 192);
+
+ bool ready = buf->output_blkd_cb_ready(200);
+ BOOST_CHECK(ready == true);
+
+ bool success = buf->output_blocked_callback(200);
+ BOOST_CHECK(success == true);
+ BOOST_CHECK(buf->space_available() == 8384);
+ BOOST_CHECK(rdr1->items_available() == 8000);
+
+ rdr1->update_read_pointer(4000);
+ BOOST_CHECK(buf->space_available() == 8384);
+ BOOST_CHECK(rdr1->items_available() == 4000);
+
+ buf->update_write_pointer(4000);
+ BOOST_CHECK(buf->space_available() == 4384);
+ BOOST_CHECK(rdr1->items_available() == 8000);
+
+ rdr1->update_read_pointer(8000);
+ BOOST_CHECK(buf->space_available() == 4384);
+ BOOST_CHECK(rdr1->items_available() == 0);
+}
+
+// ----------------------------------------------------------------------------
+// Basic check of input blocked callback
+// ----------------------------------------------------------------------------
+BOOST_AUTO_TEST_CASE(t5)
+{
+ int nitems = 65536 / sizeof(int);
+ gr::block_sptr nop(nop::make(sizeof(int))); // used as placeholder
+
+ gr::buffer_sptr buf(
+ gr::host_buffer::make_host_buffer(nitems, sizeof(int), nitems, 1, nop, nop));
+ buf->set_context(gr::buffer_context::HOST_TO_HOST);
+
+ gr::buffer_reader_sptr rdr1(gr::buffer_add_reader(buf, 0, nop));
+
+ BOOST_CHECK(nitems == buf->space_available());
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(16000);
+ BOOST_CHECK(buf->space_available() == 384);
+ BOOST_CHECK(rdr1->items_available() == 16000);
+
+ rdr1->update_read_pointer(16000);
+ BOOST_CHECK(buf->space_available() == 384);
+ BOOST_CHECK(rdr1->items_available() == 0);
+
+ buf->update_write_pointer(384);
+ BOOST_CHECK(buf->space_available() == 16000);
+ BOOST_CHECK(rdr1->items_available() == 384);
+
+ buf->update_write_pointer(116);
+ BOOST_CHECK(buf->space_available() == 15884);
+ BOOST_CHECK(rdr1->items_available() == 384);
+
+ bool ready = rdr1->input_blkd_cb_ready(400);
+ BOOST_CHECK(ready == true);
+
+ bool success = rdr1->input_blocked_callback(400, rdr1->items_available());
+ BOOST_CHECK(success == true);
+ BOOST_CHECK(rdr1->items_available() == 500);
+
+ rdr1->update_read_pointer(500);
+ BOOST_CHECK(buf->space_available() == 15884);
+ BOOST_CHECK(rdr1->items_available() == 0);
+}
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt b/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt
index dbc74b0529..2623bcf9c3 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt
@@ -18,6 +18,7 @@ messages/msg_queue_python.cc
# block_registry_python.cc
buffer_python.cc
buffer_reader_python.cc
+ buffer_type_python.cc
constants_python.cc
endianness_python.cc
expj_python.cc
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc
index 9660a7f245..4ba7742b15 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc
@@ -14,7 +14,7 @@
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
/* BINDTOOL_HEADER_FILE(block.h) */
-/* BINDTOOL_HEADER_FILE_HASH(238d129ad018daa3146ff1d8867dc356) */
+/* BINDTOOL_HEADER_FILE_HASH(23fce54cc3292f62ca2551fb3f409f77) */
/***********************************************************************************/
#include <pybind11/complex.h>
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc
index de7d4edf1c..4a991397be 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc
@@ -14,7 +14,7 @@
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
/* BINDTOOL_HEADER_FILE(buffer.h) */
-/* BINDTOOL_HEADER_FILE_HASH(e5247f4fe5b5873c66eed72880194981) */
+/* BINDTOOL_HEADER_FILE_HASH(e34c34f70f65bbc7dfbc45adcadf7796) */
/***********************************************************************************/
#include <pybind11/complex.h>
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc
index 23e2f39d10..2c86dd9fe6 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc
@@ -14,7 +14,7 @@
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
/* BINDTOOL_HEADER_FILE(buffer_reader.h) */
-/* BINDTOOL_HEADER_FILE_HASH(451fcbd61f40b7d17a151474869aad75) */
+/* BINDTOOL_HEADER_FILE_HASH(5a48682a66afd451d2f145b352c95a7e) */
/***********************************************************************************/
#include <pybind11/complex.h>
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_type_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_type_python.cc
new file mode 100644
index 0000000000..28bfcfac3c
--- /dev/null
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_type_python.cc
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+/***********************************************************************************/
+/* This file is automatically generated using bindtool and can be manually edited */
+/* The following lines can be configured to regenerate this file during cmake */
+/* If manual edits are made, the following tags should be modified accordingly. */
+/* BINDTOOL_GEN_AUTOMATIC(0) */
+/* BINDTOOL_USE_PYGCCXML(0) */
+/* BINDTOOL_HEADER_FILE(buffer_type.h) */
+/* BINDTOOL_HEADER_FILE_HASH(0b679f644e232dd519bb812b93c8c0e3) */
+/***********************************************************************************/
+
+#include <pybind11/complex.h>
+#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+
+namespace py = pybind11;
+
+#include <gnuradio/buffer_type.h>
+// pydoc.h is automatically generated in the build directory
+#include <buffer_type_pydoc.h>
+
+// NOTE: buffer_type is really a typedef of const buffer_type_base& so
+// buffer_type_base is used below because that's the type we really care about
+void bind_buffer_type(py::module& m)
+{
+
+ using buffer_type_base = ::gr::buffer_type_base;
+
+
+ py::class_<buffer_type_base>(m, "buffer_type_base", D(buffer_type_base))
+
+ .def("name", &buffer_type_base::name, D(buffer_type_base, name));
+} \ No newline at end of file
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/docstrings/buffer_type_pydoc_template.h b/gnuradio-runtime/python/gnuradio/gr/bindings/docstrings/buffer_type_pydoc_template.h
new file mode 100644
index 0000000000..cad9363030
--- /dev/null
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/docstrings/buffer_type_pydoc_template.h
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+#include "pydoc_macros.h"
+#define D(...) DOC(gr, __VA_ARGS__)
+/*
+ This file contains placeholders for docstrings for the Python bindings.
+ Do not edit! These were automatically extracted during the binding process
+ and will be overwritten during the build process
+ */
+
+
+static const char* __doc_gr_buffer_type_base = R"doc()doc";
+
+
+static const char* __doc_gr_buffer_type_base_name = R"doc()doc";
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc
index 56ed5c2fe5..d6ddf570dd 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/io_signature_python.cc
@@ -14,7 +14,7 @@
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
/* BINDTOOL_HEADER_FILE(io_signature.h) */
-/* BINDTOOL_HEADER_FILE_HASH(aa246441b45c1a5b872509ed410615ae) */
+/* BINDTOOL_HEADER_FILE_HASH(baf27e696237b6542ec62a4c5627ea1d) */
/***********************************************************************************/
#include <pybind11/complex.h>
@@ -40,6 +40,7 @@ void bind_io_signature(py::module& m)
py::arg("min_streams"),
py::arg("max_streams"),
py::arg("sizeof_stream_item"),
+ py::arg("buftype") = gr::buffer_double_mapped::type,
D(io_signature, make))
@@ -49,6 +50,8 @@ void bind_io_signature(py::module& m)
py::arg("max_streams"),
py::arg("sizeof_stream_item1"),
py::arg("sizeof_stream_item2"),
+ py::arg("buftype1") = gr::buffer_double_mapped::type,
+ py::arg("buftype2") = gr::buffer_double_mapped::type,
D(io_signature, make2))
@@ -59,14 +62,28 @@ void bind_io_signature(py::module& m)
py::arg("sizeof_stream_item1"),
py::arg("sizeof_stream_item2"),
py::arg("sizeof_stream_item3"),
+ py::arg("buftype1") = gr::buffer_double_mapped::type,
+ py::arg("buftype2") = gr::buffer_double_mapped::type,
+ py::arg("buftype3") = gr::buffer_double_mapped::type,
D(io_signature, make3))
+ .def_static(
+ "makev",
+ py::overload_cast<int, int, const std::vector<int>&>(&io_signature::makev),
+ py::arg("min_streams"),
+ py::arg("max_streams"),
+ py::arg("sizeof_stream_items"),
+ D(io_signature, makev))
.def_static("makev",
- &io_signature::makev,
+ py::overload_cast<int,
+ int,
+ const std::vector<int>&,
+ gr::gr_vector_buffer_type>(&io_signature::makev),
py::arg("min_streams"),
py::arg("max_streams"),
py::arg("sizeof_stream_items"),
+ py::arg("buftypes"),
D(io_signature, makev))
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc
index 1f39775f81..760578254d 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/python_bindings.cc
@@ -29,6 +29,7 @@ void bind_block_detail(py::module&);
void bind_block_gateway(py::module&);
// void bind_block_registry(py::module&);
void bind_buffer(py::module&);
+void bind_buffer_type(py::module& m);
void bind_constants(py::module&);
void bind_endianness(py::module&);
void bind_expj(py::module&);
@@ -121,6 +122,7 @@ PYBIND11_MODULE(gr_python, m)
bind_msg_handler(m);
bind_msg_queue(m);
+ bind_buffer_type(m);
bind_io_signature(m);
// // bind_attributes(m);
bind_basic_block(m);
diff --git a/gr-audio/lib/portaudio/portaudio_sink.cc b/gr-audio/lib/portaudio/portaudio_sink.cc
index ea27c7c0f9..607e69972f 100644
--- a/gr-audio/lib/portaudio/portaudio_sink.cc
+++ b/gr-audio/lib/portaudio/portaudio_sink.cc
@@ -66,7 +66,7 @@ void portaudio_sink::create_ringbuffer(void)
// FYI, the buffer indices are in units of samples.
d_writer = gr::make_buffer(
- N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples);
+ N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples, 1);
d_reader = gr::buffer_add_reader(d_writer, 0);
}
diff --git a/gr-audio/lib/portaudio/portaudio_source.cc b/gr-audio/lib/portaudio/portaudio_source.cc
index 63a3d5e1ba..a4916009f2 100644
--- a/gr-audio/lib/portaudio/portaudio_source.cc
+++ b/gr-audio/lib/portaudio/portaudio_source.cc
@@ -65,7 +65,7 @@ void portaudio_source::create_ringbuffer(void)
// FYI, the buffer indices are in units of samples.
d_writer = gr::make_buffer(
- N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples);
+ N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples, 1);
d_reader = gr::buffer_add_reader(d_writer, 0);
}