summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gnuradio-runtime/include/gnuradio/CMakeLists.txt5
-rw-r--r--gnuradio-runtime/include/gnuradio/basic_block.h12
-rw-r--r--gnuradio-runtime/include/gnuradio/block.h83
-rw-r--r--gnuradio-runtime/include/gnuradio/block_detail.h15
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer.h275
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_double_mapped.h116
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_reader.h192
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_reader_sm.h65
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_single_mapped.h167
-rw-r--r--gnuradio-runtime/include/gnuradio/buffer_type.h88
-rw-r--r--gnuradio-runtime/include/gnuradio/custom_lock.h64
-rw-r--r--gnuradio-runtime/include/gnuradio/logger.h2
-rw-r--r--gnuradio-runtime/lib/CMakeLists.txt5
-rw-r--r--gnuradio-runtime/lib/block.cc141
-rw-r--r--gnuradio-runtime/lib/block_detail.cc1
-rw-r--r--gnuradio-runtime/lib/block_executor.cc206
-rw-r--r--gnuradio-runtime/lib/buffer.cc273
-rw-r--r--gnuradio-runtime/lib/buffer_double_mapped.cc155
-rw-r--r--gnuradio-runtime/lib/buffer_reader.cc180
-rw-r--r--gnuradio-runtime/lib/buffer_reader_sm.cc149
-rw-r--r--gnuradio-runtime/lib/buffer_single_mapped.cc297
-rw-r--r--gnuradio-runtime/lib/buffer_type.cc18
-rw-r--r--gnuradio-runtime/lib/flat_flowgraph.cc206
-rw-r--r--gnuradio-runtime/lib/flat_flowgraph.h3
-rw-r--r--gnuradio-runtime/lib/qa_buffer.cc14
-rw-r--r--gnuradio-runtime/lib/tpb_detail.cc1
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt1
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc4
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc5
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc2
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc5
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc126
-rw-r--r--gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc4
-rw-r--r--gnuradio-runtime/python/gnuradio/gr_unittest.py19
-rw-r--r--gr-audio/lib/portaudio/portaudio_sink.cc3
-rw-r--r--gr-audio/lib/portaudio/portaudio_sink.h1
-rw-r--r--gr-audio/lib/portaudio/portaudio_source.cc3
-rw-r--r--gr-audio/lib/portaudio/portaudio_source.h1
-rw-r--r--gr-blocks/lib/qa_gr_block.cc1
-rw-r--r--gr-blocks/lib/qa_gr_top_block.cc3
-rw-r--r--gr-fec/python/fec/_qa_helper.py4
-rw-r--r--gr-fec/python/fec/extended_encoder.py4
-rw-r--r--gr-fec/python/fec/qa_depuncture.py10
-rw-r--r--gr-fec/python/fec/qa_fecapi_dummy.py33
-rw-r--r--gr-fec/python/fec/qa_fecapi_repetition.py16
45 files changed, 2463 insertions, 515 deletions
diff --git a/gnuradio-runtime/include/gnuradio/CMakeLists.txt b/gnuradio-runtime/include/gnuradio/CMakeLists.txt
index 4de1ceb353..9e22212b7b 100644
--- a/gnuradio-runtime/include/gnuradio/CMakeLists.txt
+++ b/gnuradio-runtime/include/gnuradio/CMakeLists.txt
@@ -20,7 +20,12 @@ install(FILES
block_gateway.h
block_registry.h
buffer.h
+ buffer_double_mapped.h
+ buffer_reader.h
+ buffer_reader_sm.h
+ buffer_single_mapped.h
constants.h
+ buffer_type.h
endianness.h
expj.h
flowgraph.h
diff --git a/gnuradio-runtime/include/gnuradio/basic_block.h b/gnuradio-runtime/include/gnuradio/basic_block.h
index 7a265cd05e..3816e8ab98 100644
--- a/gnuradio-runtime/include/gnuradio/basic_block.h
+++ b/gnuradio-runtime/include/gnuradio/basic_block.h
@@ -127,6 +127,18 @@ protected:
// Message passing interface
pmt::pmt_t d_message_subscribers;
+ /*!
+ * \brief This is meant to be called by derived classes (e.g. block) to get
+ * a shared pointer internally. This is needed because
+ * std::enable_shared_from_this doesn't seem to work with derived classes
+ * in an inheritance hierarchy.
+ */
+ template <typename Derived>
+ std::shared_ptr<Derived> shared_from_base()
+ {
+ return std::static_pointer_cast<Derived>(shared_from_this());
+ }
+
public:
pmt::pmt_t message_subscribers(pmt::pmt_t port);
~basic_block() override;
diff --git a/gnuradio-runtime/include/gnuradio/block.h b/gnuradio-runtime/include/gnuradio/block.h
index 0e25509d00..ee90a35d2c 100644
--- a/gnuradio-runtime/include/gnuradio/block.h
+++ b/gnuradio-runtime/include/gnuradio/block.h
@@ -11,8 +11,11 @@
#ifndef INCLUDED_GR_RUNTIME_BLOCK_H
#define INCLUDED_GR_RUNTIME_BLOCK_H
+#include <memory>
+
#include <gnuradio/api.h>
#include <gnuradio/basic_block.h>
+#include <gnuradio/buffer_type.h>
#include <gnuradio/config.h>
#include <gnuradio/logger.h>
#include <gnuradio/tags.h>
@@ -514,6 +517,78 @@ public:
*/
void set_min_output_buffer(int port, long min_output_buffer);
+ /*!
+ * \brief Allocate the block_detail and necessary output buffers for this
+ * block.
+ */
+ void allocate_detail(int ninputs,
+ int noutputs,
+ const std::vector<int>& downstream_max_nitems_vec,
+ const std::vector<uint64_t>& downstream_lcm_nitems_vec);
+
+ // --------------- Custom buffer-related functions -------------
+
+ /*!
+ * \brief Replace the block's buffer with a new one owned by the block_owner
+ * parameter
+ *
+ * \details
+ * This function is used to replace the buffer on the specified output port
+ * of the block with a new buffer that is "owned" by the specified block. This
+ * function will only be called if a downstream block is using a custom buffer
+ * that is incompatible with the default buffer type created by this block.
+ *
+ */
+ buffer_sptr replace_buffer(uint32_t out_port, block_sptr block_owner);
+
+ /*!
+ * \brief Return the type of custom buffer used by the block
+ *
+ * \details
+ * Blocks that wish to allocate custom buffers should override this function.
+ */
+ virtual buffer_type_t get_buffer_type()
+ {
+#if DEBUG_SINGLE_MAPPED
+ return buftype_CUSTOM_HOST::get();
+#else
+ return buftype_DEFAULT_NON_CUSTOM::get();
+#endif
+ }
+
+ /*!
+ * \brief Allocate a custom buffer for the block
+ *
+ * \details
+ * Blocks that wish to allocate custom buffers should override this function.
+ *
+ * \param size the size of the buffer to allocate in bytes
+ */
+ virtual char* allocate_custom_buffer(size_t size)
+ {
+#if DEBUG_SINGLE_MAPPED
+ return new char[size]();
+#else
+ return nullptr;
+#endif
+ }
+
+ /*!
+ * \brief Free a custom buffer previously allocated by allocate_custom_buffer()
+ *
+ * \details
+ * Blocks that wish to allocate custom buffers should override this function.
+ *
+ * \param buffer a pointer to the buffer
+ */
+ virtual void free_custom_buffer(char* buffer)
+ {
+#if DEBUG_SINGLE_MAPPED
+ delete[] buffer;
+#endif
+ }
+
+
// --------------- Performance counter functions -------------
/*!
@@ -909,6 +984,14 @@ protected:
void enable_update_rate(bool en);
+ /*!
+ * \brief Allocate a buffer for the given output port of this block. Note
+ * that the downstream max number of items must be passed in to this
+ * function for consideration.
+ */
+ buffer_sptr
+ allocate_buffer(int port, int downstream_max_nitems, uint64_t downstream_lcm_nitems);
+
std::vector<long> d_max_output_buffer;
std::vector<long> d_min_output_buffer;
diff --git a/gnuradio-runtime/include/gnuradio/block_detail.h b/gnuradio-runtime/include/gnuradio/block_detail.h
index 454945f0f5..4ab2ae6ed1 100644
--- a/gnuradio-runtime/include/gnuradio/block_detail.h
+++ b/gnuradio-runtime/include/gnuradio/block_detail.h
@@ -12,6 +12,8 @@
#define INCLUDED_GR_RUNTIME_BLOCK_DETAIL_H
#include <gnuradio/api.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
#include <gnuradio/high_res_timer.h>
#include <gnuradio/logger.h>
#include <gnuradio/runtime_types.h>
@@ -193,6 +195,19 @@ public:
*/
int set_thread_priority(int priority);
+ /*!
+ * Post general_work() cleanup to decrement the active counts for all inputs
+ * and outputs.
+ */
+ void post_work_cleanup()
+ {
+ // Decrement active counts for all inputs and outputs
+ for (int i = 0; i < noutputs(); i++)
+ output(i)->decrement_active();
+ for (int i = 0; i < ninputs(); i++)
+ input(i)->buffer()->decrement_active();
+ }
+
bool threaded; // set if thread is currently running.
gr::thread::gr_thread_t thread; // portable thread handle
diff --git a/gnuradio-runtime/include/gnuradio/buffer.h b/gnuradio-runtime/include/gnuradio/buffer.h
index 55313df6c0..037fade172 100644
--- a/gnuradio-runtime/include/gnuradio/buffer.h
+++ b/gnuradio-runtime/include/gnuradio/buffer.h
@@ -12,16 +12,24 @@
#define INCLUDED_GR_RUNTIME_BUFFER_H
#include <gnuradio/api.h>
+#include <gnuradio/custom_lock.h>
#include <gnuradio/logger.h>
#include <gnuradio/runtime_types.h>
#include <gnuradio/tags.h>
#include <gnuradio/thread/thread.h>
+#include <boost/weak_ptr.hpp>
+#include <iostream>
#include <map>
#include <memory>
+
namespace gr {
class vmcircbuf;
+class buffer_reader;
+class buffer_reader_sm;
+
+enum class BufferMappingType { DoubleMapped, SingleMapped };
/*!
* \brief Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
@@ -36,13 +44,15 @@ class vmcircbuf;
*/
GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
size_t sizeof_item,
- block_sptr link = block_sptr());
+ uint64_t downstream_lcm_nitems,
+ block_sptr link = block_sptr(),
+ block_sptr buf_owner = block_sptr());
/*!
* \brief Single writer, multiple reader fifo.
* \ingroup internal
*/
-class GR_RUNTIME_API buffer
+class GR_RUNTIME_API buffer : public custom_lock_if
{
public:
gr::logger_ptr d_logger;
@@ -51,14 +61,19 @@ public:
virtual ~buffer();
/*!
+ * \brief return the buffer's mapping type
+ */
+ BufferMappingType get_mapping_type() { return d_buf_map_type; }
+
+ /*!
* \brief return number of items worth of space available for writing
*/
- int space_available();
+ virtual int space_available() = 0;
/*!
* \brief return size of this buffer in items
*/
- int bufsize() const { return d_bufsize; }
+ unsigned int bufsize() const { return d_bufsize; }
/*!
* \brief return the base address of the buffer
@@ -71,7 +86,7 @@ public:
* The return value points at space that can hold at least
* space_available() items.
*/
- void* write_pointer();
+ virtual void* write_pointer();
/*!
* \brief tell buffer that we wrote \p nitems into it
@@ -93,10 +108,22 @@ public:
uint64_t nitems_written() { return d_abs_write_offset; }
- void reset_nitem_counter() { d_abs_write_offset = 0; }
+ void reset_nitem_counter()
+ {
+ d_write_index = 0;
+ d_abs_write_offset = 0;
+ }
size_t get_sizeof_item() { return d_sizeof_item; }
+ uint64_t get_downstream_lcm_nitems() { return d_downstream_lcm_nitems; }
+
+ virtual void update_reader_block_history(unsigned history, int delay)
+ {
+ d_max_reader_history = std::max(d_max_reader_history, history);
+ d_has_history = (d_max_reader_history > 1);
+ }
+
/*!
* \brief Adds a new tag to the buffer.
*
@@ -139,12 +166,67 @@ public:
return d_item_tags.upper_bound(x);
}
+ /*!
+ * \brief Returns true if the current thread is ready to execute
+ * output_blocked_callback(), false otherwise. Note if the default
+ * output_blocked_callback is overridden this function should also be
+ * overridden.
+ */
+ virtual bool output_blkd_cb_ready(int output_multiple) { return false; }
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the output is blocked. Override this function if needed.
+ */
+ virtual bool output_blocked_callback(int output_multiple, bool force = false)
+ {
+ return false;
+ }
+
+ /*!
+ * \brief Increment the number of active pointers for this buffer.
+ */
+ inline void increment_active()
+ {
+ gr::thread::scoped_lock lock(d_mutex);
+
+ d_cv.wait(lock, [this]() { return d_callback_flag == false; });
+ ++d_active_pointer_counter;
+ }
+
+ /*!
+ * \brief Decrement the number of active pointers for this buffer and signal
+ * anyone waiting when the count reaches zero.
+ */
+ inline void decrement_active()
+ {
+ gr::thread::scoped_lock lock(d_mutex);
+
+ if (--d_active_pointer_counter == 0)
+ d_cv.notify_all();
+ }
+
+ /*!
+ * \brief "on_lock" function from the custom_lock_if.
+ */
+ void on_lock(gr::thread::scoped_lock& lock);
+
+ /*!
+ * \brief "on_unlock" function from the custom_lock_if.
+ */
+ void on_unlock();
+
+ friend std::ostream& operator<<(std::ostream& os, const buffer& buf);
+
// -------------------------------------------------------------------------
private:
friend class buffer_reader;
+ friend class buffer_reader_sm;
+
friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
block_sptr link);
friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
int nzero_preload,
@@ -154,12 +236,18 @@ private:
protected:
char* d_base; // base address of buffer inside d_vmcircbuf.
unsigned int d_bufsize; // in items
+ BufferMappingType d_buf_map_type;
// Keep track of maximum sample delay of any reader; Only prune tags past this.
unsigned d_max_reader_delay;
-private:
- std::unique_ptr<gr::vmcircbuf> d_vmcircbuf;
+ // Keep track of the maximum sample history requirements of all blocks that
+ // consume from this buffer
+ unsigned d_max_reader_history;
+
+ // Indicates if d_max_reader_history > 1
+ bool d_has_history;
+
size_t d_sizeof_item; // in bytes
std::vector<buffer_reader*> d_readers;
std::weak_ptr<block> d_link; // block that writes to this buffer
@@ -167,6 +255,7 @@ private:
//
// The mutex protects d_write_index, d_abs_write_offset, d_done, d_item_tags
// and the d_read_index's and d_abs_read_offset's in the buffer readers.
+ // Also d_callback_flag and d_active_pointer_counter.
//
gr::thread::mutex d_mutex;
unsigned int d_write_index; // in items [0,d_bufsize)
@@ -174,45 +263,47 @@ private:
bool d_done;
std::multimap<uint64_t, tag_t> d_item_tags;
uint64_t d_last_min_items_read;
+ //
+ gr::thread::condition_variable d_cv;
+ bool d_callback_flag;
+ uint32_t d_active_pointer_counter;
- unsigned index_add(unsigned a, unsigned b)
- {
- unsigned s = a + b;
-
- if (s >= d_bufsize)
- s -= d_bufsize;
-
- assert(s < d_bufsize);
- return s;
- }
-
- unsigned index_sub(unsigned a, unsigned b)
- {
- int s = a - b;
+ uint64_t d_downstream_lcm_nitems;
+ uint64_t d_write_multiple;
- if (s < 0)
- s += d_bufsize;
+ /*!
+ * \brief Increment read or write index for this buffer
+ */
+ virtual unsigned index_add(unsigned a, unsigned b) = 0;
- assert((unsigned)s < d_bufsize);
- return s;
- }
+ /*!
+ * \brief Decrement read or write index for this buffer
+ */
+ virtual unsigned index_sub(unsigned a, unsigned b) = 0;
- virtual bool allocate_buffer(int nitems, size_t sizeof_item);
+ virtual bool allocate_buffer(int nitems, size_t sizeof_item) { return false; };
/*!
* \brief constructor is private. Use gr_make_buffer to create instances.
*
* Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
*
+ * \param buftype is an enum type that describes the buffer TODO: fix me
* \param nitems is the minimum number of items the buffer will hold.
* \param sizeof_item is the size of an item in bytes.
+ * \param downstream_lcm_nitems is the least common multiple of the items to
+ * read by downstream block(s)
* \param link is the block that writes to this buffer.
*
* The total size of the buffer will be rounded up to a system
* dependent boundary. This is typically the system page size, but
* under MS windows is 64KB.
*/
- buffer(int nitems, size_t sizeof_item, block_sptr link);
+ buffer(BufferMappingType buftype,
+ int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link);
/*!
* \brief disassociate \p reader from this buffer
@@ -220,135 +311,9 @@ private:
void drop_reader(buffer_reader* reader);
};
-/*!
- * \brief Create a new gr::buffer_reader and attach it to buffer \p buf
- * \param buf is the buffer the \p gr::buffer_reader reads from.
- * \param nzero_preload -- number of zero items to "preload" into buffer.
- * \param link is the block that reads from the buffer using this gr::buffer_reader.
- * \param delay Optional setting to declare the buffer's sample delay.
- */
-GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
- int nzero_preload,
- block_sptr link = block_sptr(),
- int delay = 0);
-
//! returns # of buffers currently allocated
GR_RUNTIME_API long buffer_ncurrently_allocated();
-
-// ---------------------------------------------------------------------------
-
-/*!
- * \brief How we keep track of the readers of a gr::buffer.
- * \ingroup internal
- */
-class GR_RUNTIME_API buffer_reader
-{
-public:
- ~buffer_reader();
-
- /*!
- * Declares the sample delay for this reader.
- *
- * See gr::block::declare_sample_delay for details.
- *
- * \param delay The new sample delay
- */
- void declare_sample_delay(unsigned delay);
-
- /*!
- * Gets the sample delay for this reader.
- *
- * See gr::block::sample_delay for details.
- */
- unsigned sample_delay() const;
-
- /*!
- * \brief Return number of items available for reading.
- */
- int items_available() const;
-
- /*!
- * \brief Return buffer this reader reads from.
- */
- buffer_sptr buffer() const { return d_buffer; }
-
- /*!
- * \brief Return maximum number of items that could ever be available for reading.
- * This is used as a sanity check in the scheduler to avoid looping forever.
- */
- int max_possible_items_available() const { return d_buffer->d_bufsize - 1; }
-
- /*!
- * \brief return pointer to read buffer.
- *
- * The return value points to items_available() number of items
- */
- const void* read_pointer();
-
- /*
- * \brief tell buffer we read \p items from it
- */
- void update_read_pointer(int nitems);
-
- void set_done(bool done) { d_buffer->set_done(done); }
- bool done() const { return d_buffer->done(); }
-
- gr::thread::mutex* mutex() { return d_buffer->mutex(); }
-
- uint64_t nitems_read() { return d_abs_read_offset; }
-
- void reset_nitem_counter() { d_abs_read_offset = 0; }
-
- size_t get_sizeof_item() { return d_buffer->get_sizeof_item(); }
-
- /*!
- * \brief Return the block that reads via this reader.
- *
- */
- block_sptr link() { return block_sptr(d_link); }
-
- /*!
- * \brief Given a [start,end), returns a vector all tags in the range.
- *
- * Get a vector of tags in given range. Range of counts is from start to end-1.
- *
- * Tags are tuples of:
- * (item count, source id, key, value)
- *
- * \param v a vector reference to return tags into
- * \param abs_start a uint64 count of the start of the range of interest
- * \param abs_end a uint64 count of the end of the range of interest
- * \param id the unique ID of the block to make sure already deleted tags
- * are not returned
- */
- void get_tags_in_range(std::vector<tag_t>& v,
- uint64_t abs_start,
- uint64_t abs_end,
- long id);
-
- // -------------------------------------------------------------------------
-
-private:
- friend class buffer;
- friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
- int nzero_preload,
- block_sptr link,
- int delay);
-
- buffer_sptr d_buffer;
- unsigned int d_read_index; // in items [0,d->buffer.d_bufsize)
- uint64_t d_abs_read_offset; // num items seen since the start
- std::weak_ptr<block> d_link; // block that reads via this buffer reader
- unsigned d_attr_delay; // sample delay attribute for tag propagation
-
- //! constructor is private. Use gr::buffer::add_reader to create instances
- buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link);
-};
-
-//! returns # of buffer_readers currently allocated
-GR_RUNTIME_API long buffer_reader_ncurrently_allocated();
-
} /* namespace gr */
#endif /* INCLUDED_GR_RUNTIME_BUFFER_H */
diff --git a/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h b/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h
new file mode 100644
index 0000000000..b8cc7cdc87
--- /dev/null
+++ b/gnuradio-runtime/include/gnuradio/buffer_double_mapped.h
@@ -0,0 +1,116 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifndef INCLUDED_GR_RUNTIME_BUFFER_DOUBLE_MAPPED_H
+#define INCLUDED_GR_RUNTIME_BUFFER_DOUBLE_MAPPED_H
+
+#include <gnuradio/api.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/logger.h>
+#include <gnuradio/runtime_types.h>
+
+namespace gr {
+
+class vmcircbuf;
+
+/*!
+ * \brief Note this function is only used and really intended to be used in
+ * qa_buffer.cc for the unit tests of buffer_double_mapped.
+ *
+ */
+GR_RUNTIME_API buffer_sptr make_buffer_double_mapped(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link = block_sptr());
+
+/*!
+ * \brief Single writer, multiple reader fifo.
+ * \ingroup internal
+ */
+class GR_RUNTIME_API buffer_double_mapped : public buffer
+{
+public:
+ gr::logger_ptr d_logger;
+ gr::logger_ptr d_debug_logger;
+
+ virtual ~buffer_double_mapped();
+
+ /*!
+ * \brief return number of items worth of space available for writing
+ */
+ virtual int space_available();
+
+protected:
+ /*!
+ * sets d_vmcircbuf, d_base, d_bufsize.
+ * returns true iff successful.
+ */
+ bool allocate_buffer(int nitems, size_t sizeof_item);
+
+ virtual unsigned index_add(unsigned a, unsigned b)
+ {
+ unsigned s = a + b;
+
+ if (s >= d_bufsize)
+ s -= d_bufsize;
+
+ assert(s < d_bufsize);
+ return s;
+ }
+
+ virtual unsigned index_sub(unsigned a, unsigned b)
+ {
+ int s = a - b;
+
+ if (s < 0)
+ s += d_bufsize;
+
+ assert((unsigned)s < d_bufsize);
+ return s;
+ }
+
+private:
+ friend class buffer_reader;
+
+ friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link,
+ block_sptr buf_owner);
+ friend GR_RUNTIME_API buffer_sptr make_buffer_double_mapped(
+ int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems, block_sptr link);
+
+ std::unique_ptr<gr::vmcircbuf> d_vmcircbuf;
+
+ /*!
+ * \brief constructor is private. Use gr_make_buffer to create instances.
+ *
+ * Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
+ *
+ * \param nitems is the minimum number of items the buffer will hold.
+ * \param sizeof_item is the size of an item in bytes.
+ * \param downstream_lcm_nitems is the least common multiple of the items to
+ * read by downstream blocks
+ * \param link is the block that writes to this buffer.
+ *
+ * The total size of the buffer will be rounded up to a system
+ * dependent boundary. This is typically the system page size, but
+ * under MS windows is 64KB.
+ */
+ buffer_double_mapped(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link);
+};
+
+} /* namespace gr */
+
+
+#endif /* INCLUDED_GR_RUNTIME_BUFFER_DOUBLE_MAPPED_H */
diff --git a/gnuradio-runtime/include/gnuradio/buffer_reader.h b/gnuradio-runtime/include/gnuradio/buffer_reader.h
new file mode 100644
index 0000000000..a7e51d02e9
--- /dev/null
+++ b/gnuradio-runtime/include/gnuradio/buffer_reader.h
@@ -0,0 +1,192 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2009-2011,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifndef INCLUDED_GR_RUNTIME_BUFFER_READER_H
+#define INCLUDED_GR_RUNTIME_BUFFER_READER_H
+
+#include <gnuradio/api.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/logger.h>
+#include <gnuradio/runtime_types.h>
+#include <gnuradio/tags.h>
+#include <gnuradio/thread/thread.h>
+#include <boost/weak_ptr.hpp>
+#include <map>
+#include <memory>
+
+namespace gr {
+
+class buffer_reader_sm;
+
+/*!
+ * \brief Create a new gr::buffer_reader and attach it to buffer \p buf
+ * \param buf is the buffer the \p gr::buffer_reader reads from.
+ * \param nzero_preload -- number of zero items to "preload" into buffer.
+ * \param link is the block that reads from the buffer using this gr::buffer_reader.
+ * \param delay Optional setting to declare the buffer's sample delay.
+ */
+GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
+ int nzero_preload,
+ block_sptr link = block_sptr(),
+ int delay = 0);
+
+//! returns # of buffers currently allocated
+GR_RUNTIME_API long buffer_ncurrently_allocated();
+
+
+// ---------------------------------------------------------------------------
+
+/*!
+ * \brief How we keep track of the readers of a gr::buffer.
+ * \ingroup internal
+ */
+class GR_RUNTIME_API buffer_reader
+{
+public:
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ gr::logger_ptr d_logger;
+ gr::logger_ptr d_debug_logger;
+#endif
+
+ virtual ~buffer_reader();
+
+ /*!
+ * Declares the sample delay for this reader.
+ *
+ * See gr::block::declare_sample_delay for details.
+ *
+ * \param delay The new sample delay
+ */
+ void declare_sample_delay(unsigned delay);
+
+ /*!
+ * Gets the sample delay for this reader.
+ *
+ * See gr::block::sample_delay for details.
+ */
+ unsigned sample_delay() const;
+
+ /*!
+ * \brief Return number of items available for reading.
+ */
+ virtual int items_available(); // const
+
+ /*!
+ * \brief Return buffer this reader reads from.
+ */
+ buffer_sptr buffer() const { return d_buffer; }
+
+ /*!
+ * \brief Return maximum number of items that could ever be available for reading.
+ * This is used as a sanity check in the scheduler to avoid looping forever.
+ */
+ int max_possible_items_available() const { return d_buffer->bufsize() - 1; }
+
+ /*!
+ * \brief return pointer to read buffer.
+ *
+ * The return value points to items_available() number of items
+ */
+ const void* read_pointer();
+
+ /*
+ * \brief tell buffer we read \p items from it
+ */
+ void update_read_pointer(int nitems);
+
+ void set_done(bool done) { d_buffer->set_done(done); }
+ bool done() const { return d_buffer->done(); }
+
+ gr::thread::mutex* mutex() { return d_buffer->mutex(); }
+
+ uint64_t nitems_read() const { return d_abs_read_offset; }
+
+ void reset_nitem_counter()
+ {
+ d_read_index = 0;
+ d_abs_read_offset = 0;
+ }
+
+ size_t get_sizeof_item() { return d_buffer->get_sizeof_item(); }
+
+ /*!
+ * \brief Return the block that reads via this reader.
+ *
+ */
+ block_sptr link() { return block_sptr(d_link); }
+
+ /*!
+ * \brief Given a [start,end), returns a vector all tags in the range.
+ *
+ * Get a vector of tags in given range. Range of counts is from start to end-1.
+ *
+ * Tags are tuples of:
+ * (item count, source id, key, value)
+ *
+ * \param v a vector reference to return tags into
+ * \param abs_start a uint64 count of the start of the range of interest
+ * \param abs_end a uint64 count of the end of the range of interest
+ * \param id the unique ID of the block to make sure already deleted tags
+ * are not returned
+ */
+ void get_tags_in_range(std::vector<tag_t>& v,
+ uint64_t abs_start,
+ uint64_t abs_end,
+ long id);
+
+ /*!
+ * \brief Returns true when the current thread is ready to call the callback,
+ * false otherwise. Note if input_blocked_callback is overridden then this
+ * function should also be overridden.
+ */
+ virtual bool input_blkd_cb_ready(int items_required) const { return false; }
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the input is blocked. Override this function if needed.
+ */
+ virtual bool input_blocked_callback(int items_required, int items_avail)
+ {
+ return false;
+ }
+
+ // -------------------------------------------------------------------------
+ unsigned int get_read_index() const { return d_read_index; }
+ uint64_t get_abs_read_offset() const { return d_abs_read_offset; }
+
+protected:
+ friend class buffer;
+ friend class buffer_double_mapped;
+ friend class buffer_single_mapped;
+ friend class buffer_reader_sm;
+
+ friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
+ int nzero_preload,
+ block_sptr link,
+ int delay);
+
+ buffer_sptr d_buffer;
+ unsigned int d_read_index; // in items [0,d->buffer.d_bufsize) ** see NB
+ uint64_t d_abs_read_offset; // num items seen since the start ** see NB
+ std::weak_ptr<block> d_link; // block that reads via this buffer reader
+ unsigned d_attr_delay; // sample delay attribute for tag propagation
+ // ** NB: buffer::d_mutex protects d_read_index and d_abs_read_offset
+
+ //! constructor is private. Use gr::buffer::add_reader to create instances
+ buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link);
+};
+
+//! returns # of buffer_readers currently allocated
+GR_RUNTIME_API long buffer_reader_ncurrently_allocated();
+
+} /* namespace gr */
+
+#endif /* INCLUDED_GR_RUNTIME_BUFFER_READER_H */
diff --git a/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h b/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h
new file mode 100644
index 0000000000..627bfd8582
--- /dev/null
+++ b/gnuradio-runtime/include/gnuradio/buffer_reader_sm.h
@@ -0,0 +1,65 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2009-2011,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifndef INCLUDED_GR_RUNTIME_BUFFER_READER_SM_H
+#define INCLUDED_GR_RUNTIME_BUFFER_READER_SM_H
+
+#include <gnuradio/api.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/logger.h>
+#include <gnuradio/runtime_types.h>
+#include <gnuradio/tags.h>
+#include <gnuradio/thread/thread.h>
+#include <boost/weak_ptr.hpp>
+#include <map>
+#include <memory>
+
+namespace gr {
+
+class GR_RUNTIME_API buffer_reader_sm : public buffer_reader
+{
+public:
+ ~buffer_reader_sm();
+
+ /*!
+ * \brief Return number of items available for reading.
+ */
+ virtual int items_available(); // const
+
+ /*!
+ * \brief Return true if thread is ready to call input_blocked_callback,
+ * false otherwise
+ */
+ virtual bool input_blkd_cb_ready(int items_required) const;
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the input is blocked
+ */
+ virtual bool input_blocked_callback(int items_required, int items_avail);
+
+private:
+ friend class buffer;
+ friend class buffer_double_mapped;
+ friend class buffer_single_mapped;
+
+ friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
+ int nzero_preload,
+ block_sptr link,
+ int delay);
+
+ //! constructor is private. Use gr::buffer::add_reader to create instances
+ buffer_reader_sm(buffer_sptr buffer, unsigned int read_index, block_sptr link);
+};
+
+} // namespace gr
+
+#endif /* INCLUDED_GR_RUNTIME_BUFFER_READER_SM_H */
diff --git a/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h b/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h
new file mode 100644
index 0000000000..5377534aa8
--- /dev/null
+++ b/gnuradio-runtime/include/gnuradio/buffer_single_mapped.h
@@ -0,0 +1,167 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifndef INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H
+#define INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H
+
+#include <functional>
+
+#include <gnuradio/api.h>
+#include <gnuradio/block.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/logger.h>
+#include <gnuradio/runtime_types.h>
+
+namespace gr {
+
+/*!
+ * TODO: update this
+ *
+ * \brief Single writer, multiple reader fifo.
+ * \ingroup internal
+ */
+class GR_RUNTIME_API buffer_single_mapped : public buffer
+{
+public:
+ gr::logger_ptr d_logger;
+ gr::logger_ptr d_debug_logger;
+
+ virtual ~buffer_single_mapped();
+
+ /*!
+ * \brief Return the block that owns this buffer.
+ */
+ block_sptr buf_owner() { return d_buf_owner; }
+
+ /*!
+ * \brief return number of items worth of space available for writing
+ */
+ virtual int space_available();
+
+ virtual void update_reader_block_history(unsigned history, int delay)
+ {
+ unsigned old_max = d_max_reader_history;
+ d_max_reader_history = std::max(d_max_reader_history, history);
+ if (d_max_reader_history != old_max) {
+ d_write_index = d_max_reader_history - 1;
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "buffer_single_mapped constructor -- set wr index to: "
+ << d_write_index;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Reset the reader's read index if the buffer's write index has changed.
+ // Note that "history - 1" is the nzero_preload value passed to
+ // buffer_add_reader.
+ for (auto reader : d_readers) {
+ reader->d_read_index = d_write_index - (reader->link()->history() - 1);
+ }
+ }
+
+ // Only attempt to set has history flag if it is not already set
+ if (!d_has_history) {
+ // Blocks that set delay may set history to delay + 1 but this is
+ // not "real" history
+ d_has_history = ((static_cast<int>(history) - 1) != delay);
+ }
+ }
+
+ void deleter(char* ptr)
+ {
+ // Delegate free of the underlying buffer to the block that owns it
+ if (ptr != nullptr)
+ buf_owner()->free_custom_buffer(ptr);
+ }
+
+ /*!
+ * \brief Return true if thread is ready to call the callback, false otherwise
+ */
+ virtual bool output_blkd_cb_ready(int output_multiple);
+
+ /*!
+ * \brief Callback function that the scheduler will call when it determines
+ * that the output is blocked
+ */
+ virtual bool output_blocked_callback(int output_multiple, bool force);
+
+protected:
+ /*!
+ * sets d_base, d_bufsize.
+ * returns true iff successful.
+ */
+ bool allocate_buffer(int nitems, size_t sizeof_item, uint64_t downstream_lcm_nitems);
+
+ virtual unsigned index_add(unsigned a, unsigned b)
+ {
+ unsigned s = a + b;
+
+ if (s >= d_bufsize)
+ s -= d_bufsize;
+
+ assert(s < d_bufsize);
+ return s;
+ }
+
+ virtual unsigned index_sub(unsigned a, unsigned b)
+ {
+ // NOTE: a is writer ptr and b is read ptr
+ int s = a - b;
+
+ if (s < 0)
+ s = d_bufsize - b;
+
+ assert((unsigned)s < d_bufsize);
+ return s;
+ }
+
+private:
+ friend class buffer_reader;
+
+ friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link,
+ block_sptr buf_owner);
+
+ block_sptr d_buf_owner; // block that "owns" this buffer
+
+ std::unique_ptr<char, std::function<void(char*)>> d_buffer;
+
+ /*!
+ * \brief constructor is private. Use gr_make_buffer to create instances.
+ *
+ * Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
+ *
+ * \param nitems is the minimum number of items the buffer will hold.
+ * \param sizeof_item is the size of an item in bytes.
+ * \param downstream_lcm_nitems is the least common multiple of the items to
+ * read by downstream blocks
+ * \param link is the block that writes to this buffer.
+ * \param buf_owner if the block that owns the buffer which may or may not
+ * be the same as the block that writes to this buffer
+ *
+ * The total size of the buffer will be rounded up to a system
+ * dependent boundary. This is typically the system page size, but
+ * under MS windows is 64KB.
+ */
+ buffer_single_mapped(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link,
+ block_sptr buf_owner);
+};
+
+} /* namespace gr */
+
+
+#endif /* INCLUDED_GR_RUNTIME_BUFFER_SINGLE_MAPPED_H */
diff --git a/gnuradio-runtime/include/gnuradio/buffer_type.h b/gnuradio-runtime/include/gnuradio/buffer_type.h
new file mode 100644
index 0000000000..527b5ff8e3
--- /dev/null
+++ b/gnuradio-runtime/include/gnuradio/buffer_type.h
@@ -0,0 +1,88 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifndef INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H
+#define INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H
+
+#include <gnuradio/api.h>
+
+#include <cstdint>
+#include <mutex>
+#include <string>
+
+namespace gr {
+
+
+class GR_RUNTIME_API buffer_type_base
+{
+public:
+ virtual ~buffer_type_base(){};
+
+ // Do not allow copying or assignment
+ buffer_type_base(buffer_type_base const&) = delete;
+ void operator=(buffer_type_base const&) = delete;
+
+ // Allow equality and inequality comparison
+ bool operator==(const buffer_type_base& other) const
+ {
+ return d_value == other.d_value;
+ }
+
+ bool operator!=(const buffer_type_base& other) const
+ {
+ return d_value != other.d_value;
+ }
+
+ // Do not allow other comparison (just in case)
+ bool operator<(const buffer_type_base& other) = delete;
+ bool operator>(const buffer_type_base& other) = delete;
+ bool operator<=(const buffer_type_base& other) = delete;
+ bool operator>=(const buffer_type_base& other) = delete;
+
+ const std::string& name() const { return d_name; }
+
+protected:
+ static uint32_t s_nextId;
+ static std::mutex s_mutex;
+
+ uint32_t d_value;
+ std::string d_name;
+
+ // Private constructor
+ buffer_type_base(const char* name) : d_name(name)
+ {
+ std::lock_guard<std::mutex> lock(s_mutex);
+ d_value = s_nextId++;
+ }
+};
+
+typedef const buffer_type_base& buffer_type_t;
+
+
+#define MAKE_CUSTOM_BUFFER_TYPE(CLASSNAME) \
+ class GR_RUNTIME_API buftype_##CLASSNAME : public buffer_type_base \
+ { \
+ public: \
+ static buffer_type_t get() \
+ { \
+ static buftype_##CLASSNAME instance; \
+ return instance; \
+ } \
+ \
+ private: \
+ buftype_##CLASSNAME() : buffer_type_base(#CLASSNAME) {} \
+ };
+
+MAKE_CUSTOM_BUFFER_TYPE(DEFAULT_NON_CUSTOM);
+MAKE_CUSTOM_BUFFER_TYPE(CUSTOM_HOST); // used only for test purposes
+
+} // namespace gr
+
+#endif /* INCLUDED_GR_RUNTIME_CUSTOM_BUFFER_TYPE_H */ \ No newline at end of file
diff --git a/gnuradio-runtime/include/gnuradio/custom_lock.h b/gnuradio-runtime/include/gnuradio/custom_lock.h
new file mode 100644
index 0000000000..6422254d99
--- /dev/null
+++ b/gnuradio-runtime/include/gnuradio/custom_lock.h
@@ -0,0 +1,64 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2021 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifndef INCLUDED_GR_CUSTOM_LOCK_H
+#define INCLUDED_GR_CUSTOM_LOCK_H
+
+#include <gnuradio/api.h>
+#include <gnuradio/logger.h>
+#include <gnuradio/thread/thread.h>
+
+namespace gr {
+
+/*!
+ * Custom lock interface. Objects should implement this interface in order to
+ * use the custom_lock object below.
+ */
+class custom_lock_if
+{
+public:
+ /*!
+ * This function will be executed on construction of the custom lock.
+ */
+ virtual void on_lock(gr::thread::scoped_lock& lock) = 0;
+
+ /*!
+ * This function will be executed on destruction of the custom lock.
+ */
+ virtual void on_unlock() = 0;
+};
+
+/*!
+ * Write me!
+ */
+class custom_lock
+{
+public:
+ explicit custom_lock(gr::thread::mutex& mutex, std::shared_ptr<custom_lock_if> locker)
+ : d_mutex(mutex), d_lock(mutex), d_locker(locker)
+ {
+ d_locker->on_lock(d_lock);
+ }
+
+ ~custom_lock() { d_locker->on_unlock(); }
+
+ // Disallow copying and assignment
+ custom_lock(custom_lock const&) = delete;
+ custom_lock& operator=(custom_lock const&) = delete;
+
+private:
+ gr::thread::mutex& d_mutex;
+ gr::thread::scoped_lock d_lock;
+ std::shared_ptr<custom_lock_if> d_locker;
+};
+
+} /* namespace gr */
+
+#endif /* INCLUDED_GR_CUSTOM_LOCK_H */ \ No newline at end of file
diff --git a/gnuradio-runtime/include/gnuradio/logger.h b/gnuradio-runtime/include/gnuradio/logger.h
index 6891706fb0..e9409c93a3 100644
--- a/gnuradio-runtime/include/gnuradio/logger.h
+++ b/gnuradio-runtime/include/gnuradio/logger.h
@@ -40,7 +40,7 @@ namespace gr {
* \brief GR_LOG macros
* \ingroup logging
*
- * These macros wrap the standard LOG4CPP_LEVEL macros. The availablie macros
+ * These macros wrap the standard LOG4CPP_LEVEL macros. The available macros
* are:
* LOG_DEBUG
* LOG_INFO
diff --git a/gnuradio-runtime/lib/CMakeLists.txt b/gnuradio-runtime/lib/CMakeLists.txt
index 42486de801..ca96549386 100644
--- a/gnuradio-runtime/lib/CMakeLists.txt
+++ b/gnuradio-runtime/lib/CMakeLists.txt
@@ -55,6 +55,11 @@ add_library(gnuradio-runtime
block_gateway_impl.cc
block_registry.cc
buffer.cc
+ buffer_double_mapped.cc
+ buffer_reader.cc
+ buffer_reader_sm.cc
+ buffer_single_mapped.cc
+ buffer_type.cc
flat_flowgraph.cc
flowgraph.cc
hier_block2.cc
diff --git a/gnuradio-runtime/lib/block.cc b/gnuradio-runtime/lib/block.cc
index cddf5a5baa..bb6ce95298 100644
--- a/gnuradio-runtime/lib/block.cc
+++ b/gnuradio-runtime/lib/block.cc
@@ -24,6 +24,12 @@
namespace gr {
+// Moved from flat_flowgraph.cc
+// 32Kbyte buffer size between blocks
+#define GR_FIXED_BUFFER_SIZE (32 * (1L << 10))
+
+static const unsigned int s_fixed_buffer_size = GR_FIXED_BUFFER_SIZE;
+
block::block(const std::string& name,
io_signature::sptr input_signature,
io_signature::sptr output_signature)
@@ -375,11 +381,146 @@ void block::set_min_output_buffer(int port, long min_output_buffer)
d_min_output_buffer[port] = min_output_buffer;
}
+void block::allocate_detail(int ninputs,
+ int noutputs,
+ const std::vector<int>& downstream_max_nitems_vec,
+ const std::vector<uint64_t>& downstream_lcm_nitems_vec)
+{
+ block_detail_sptr detail = make_block_detail(ninputs, noutputs);
+
+ GR_LOG_DEBUG(d_debug_logger, "Creating block detail for " + identifier());
+
+ for (int i = 0; i < noutputs; i++) {
+ expand_minmax_buffer(i);
+
+ buffer_sptr buffer = allocate_buffer(
+ i, downstream_max_nitems_vec[i], downstream_lcm_nitems_vec[i]);
+ GR_LOG_DEBUG(d_debug_logger,
+ "Allocated buffer for output " + identifier() + " " +
+ std::to_string(i));
+ detail->set_output(i, buffer);
+
+ // Update the block's max_output_buffer based on what was actually allocated.
+ if ((max_output_buffer(i) != buffer->bufsize()) && (max_output_buffer(i) != -1))
+ GR_LOG_WARN(d_logger,
+ boost::format("Block (%1%) max output buffer set to %2%"
+ " instead of requested %3%") %
+ alias() % buffer->bufsize() % max_output_buffer(i));
+ set_max_output_buffer(i, buffer->bufsize());
+ }
+
+ // Store the block_detail that was created above
+ set_detail(detail);
+}
+
+buffer_sptr block::replace_buffer(uint32_t out_port, block_sptr block_owner)
+{
+ block_detail_sptr detail_ = detail();
+ buffer_sptr orig_buffer = detail_->output(out_port);
+
+ // Make a new buffer but this time use the passed in block as the owner
+ buffer_sptr new_buffer = make_buffer(orig_buffer->bufsize(),
+ orig_buffer->get_sizeof_item(),
+ orig_buffer->get_downstream_lcm_nitems(),
+ shared_from_base<block>(),
+ block_owner);
+
+ detail_->set_output(out_port, new_buffer);
+ return new_buffer;
+}
bool block::update_rate() const { return d_update_rate; }
void block::enable_update_rate(bool en) { d_update_rate = en; }
+buffer_sptr block::allocate_buffer(int port,
+ int downstream_max_nitems,
+ uint64_t downstream_lcm_nitems)
+{
+ int item_size = output_signature()->sizeof_stream_item(port);
+
+ // *2 because we're now only filling them 1/2 way in order to
+ // increase the available parallelism when using the TPB scheduler.
+ // (We're double buffering, where we used to single buffer)
+ int nitems = s_fixed_buffer_size * 2 / item_size;
+
+ // Make sure there are at least twice the output_multiple no. of items
+ if (nitems < 2 * output_multiple()) // Note: this means output_multiple()
+ nitems = 2 * output_multiple(); // can't be changed by block dynamically
+
+ // Limit buffer size if indicated
+ if (max_output_buffer(port) > 0) {
+ // std::cout << "constraining output items to " << block->max_output_buffer(port)
+ // << "\n";
+ nitems = std::min((long)nitems, (long)max_output_buffer(port));
+ nitems -= nitems % output_multiple();
+ if (nitems < 1)
+ throw std::runtime_error("problems allocating a buffer with the given max "
+ "output buffer constraint!");
+ } else if (min_output_buffer(port) > 0) {
+ nitems = std::max((long)nitems, (long)min_output_buffer(port));
+ nitems -= nitems % output_multiple();
+ if (nitems < 1)
+ throw std::runtime_error("problems allocating a buffer with the given min "
+ "output buffer constraint!");
+ }
+
+ // If any downstream blocks are decimators and/or have a large output_multiple,
+ // ensure we have a buffer at least twice their decimation factor*output_multiple
+ nitems = std::max(nitems, downstream_max_nitems);
+
+ // We're going to let this fail once and retry. If that fails, throw and exit.
+ buffer_sptr buf;
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ GR_LOG_DEBUG(d_logger,
+ "Block: " + name() + " allocated buffer for output " + identifier());
+#endif
+
+ try {
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ std::ostringstream msg;
+ msg << "downstream_max_nitems: " << downstream_max_nitems
+ << " -- downstream_lcm_nitems: " << downstream_lcm_nitems
+ << " -- output_multiple(): " << output_multiple()
+ << " -- out_mult_set: " << output_multiple_set() << " -- nitems: " << nitems
+ << " -- history: " << history() << " -- relative_rate: " << relative_rate();
+ if (relative_rate() != 1.0) {
+ msg << " (" << relative_rate_i() << " / " << relative_rate_d() << ")";
+ }
+ msg << " -- fixed_rate: " << fixed_rate();
+ if (fixed_rate()) {
+ int num_inputs = fixed_rate_noutput_to_ninput(1) - (history() - 1);
+ msg << " (" << num_inputs << " -> "
+ << fixed_rate_ninput_to_noutput(num_inputs + (history() - 1)) << ")";
+ }
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+ buf = make_buffer(nitems,
+ item_size,
+ downstream_lcm_nitems,
+ shared_from_base<block>(),
+ shared_from_base<block>());
+
+ } catch (std::bad_alloc&) {
+ buf = make_buffer(nitems,
+ item_size,
+ downstream_lcm_nitems,
+ shared_from_base<block>(),
+ shared_from_base<block>());
+ }
+
+ // Set the max noutput items size here to make sure it's always
+ // set in the block and available in the start() method.
+ // But don't overwrite if the user has set this externally.
+ if (!is_set_max_noutput_items())
+ set_max_noutput_items(nitems);
+
+ return buf;
+}
+
float block::pc_noutput_items()
{
if (d_detail) {
diff --git a/gnuradio-runtime/lib/block_detail.cc b/gnuradio-runtime/lib/block_detail.cc
index 239223603d..f5283c56b0 100644
--- a/gnuradio-runtime/lib/block_detail.cc
+++ b/gnuradio-runtime/lib/block_detail.cc
@@ -14,6 +14,7 @@
#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
#include <gnuradio/logger.h>
namespace gr {
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc
index c026934bd9..6fbf2c5c17 100644
--- a/gnuradio-runtime/lib/block_executor.cc
+++ b/gnuradio-runtime/lib/block_executor.cc
@@ -14,13 +14,13 @@
#include <gnuradio/block.h>
#include <gnuradio/block_detail.h>
-#include <gnuradio/buffer.h>
-#include <gnuradio/logger.h>
+#include <gnuradio/custom_lock.h>
#include <gnuradio/prefs.h>
#include <block_executor.h>
#include <limits>
#include <sstream>
+
namespace gr {
// must be defined to either 0 or 1
@@ -53,27 +53,55 @@ inline static unsigned int round_down(unsigned int n, unsigned int multiple)
// buffers or -1 if we're output blocked and the output we're
// blocked on is done.
//
-static int
-min_available_space(block_detail* d, int output_multiple, int min_noutput_items)
+static int min_available_space(block* m,
+ block_detail* d,
+ int output_multiple,
+ int min_noutput_items,
+ int& output_idx)
{
+ gr::logger_ptr logger;
+ gr::logger_ptr debug_logger;
+ gr::configure_default_loggers(logger, debug_logger, "min_available_space");
+
int min_space = std::numeric_limits<int>::max();
if (min_noutput_items == 0)
min_noutput_items = 1;
- for (int i = 0; i < d->noutputs(); i++) {
+ for (int i = output_idx; i < d->noutputs(); i++) {
buffer_sptr out_buf = d->output(i);
gr::thread::scoped_lock guard(*out_buf->mutex());
- int avail_n = round_down(out_buf->space_available(), output_multiple);
+ int space_avail = out_buf->space_available();
+ int avail_n = round_down(space_avail, output_multiple);
+ // If not strictly output multiple size aligned, potentially use all
+ // space available in the output buffer
+ if (avail_n == 0 && space_avail < output_multiple && !m->output_multiple_set()) {
+ avail_n = std::max(avail_n, space_avail);
+ }
int best_n = round_down(out_buf->bufsize() / 2, output_multiple);
if (best_n < min_noutput_items)
throw std::runtime_error("Buffer too small for min_noutput_items");
int n = std::min(avail_n, best_n);
if (n < min_noutput_items) { // We're blocked on output.
- if (out_buf->done()) { // Downstream is done, therefore we're done.
+ LOG(std::ostringstream msg;
+ msg << m << " **[i=" << i << "] output_multiple=" << output_multiple
+ << " min_noutput_items=" << min_noutput_items
+ << " avail_n=" << avail_n << " best_n=" << best_n << " n=" << n
+ << " min_space=" << min_space << " outbuf_done=" << out_buf->done();
+ GR_LOG_INFO(debug_logger, msg.str()););
+
+ if (out_buf->done()) { // Downstream is done, therefore we're done.
return -1;
}
+
+ output_idx = i;
return 0;
}
min_space = std::min(min_space, n);
+
+ LOG(std::ostringstream msg;
+ msg << m << " [i=" << i << "] output_multiple=" << output_multiple
+ << " min_noutput_items=" << min_noutput_items << " avail_n=" << avail_n
+ << " best_n=" << best_n << " n=" << n << " min_space=" << min_space;
+ GR_LOG_INFO(debug_logger, msg.str()););
}
return min_space;
}
@@ -233,6 +261,7 @@ block_executor::state block_executor::run_one_iteration()
int max_noutput_items;
int new_alignment = 0;
int alignment_state = -1;
+ int output_idx = 0;
block* m = d_block.get();
block_detail* d = m->detail().get();
@@ -255,8 +284,10 @@ block_executor::state block_executor::run_one_iteration()
d_start_nitems_read.resize(0);
// determine the minimum available output space
- noutput_items =
- min_available_space(d, m->output_multiple(), m->min_noutput_items());
+ output_idx = 0;
+ out_try_again:
+ noutput_items = min_available_space(
+ m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
noutput_items = std::min(noutput_items, max_noutput_items);
LOG(std::ostringstream msg; msg << "source: noutput_items = " << noutput_items;
GR_LOG_INFO(d_debug_logger, msg.str()););
@@ -264,8 +295,28 @@ block_executor::state block_executor::run_one_iteration()
goto were_done;
if (noutput_items == 0) { // we're output blocked
- LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
- return BLKD_OUT;
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+
+ buffer_sptr out_buf = d->output(output_idx);
+
+ if (out_buf->output_blkd_cb_ready(m->output_multiple())) {
+ gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
+ if (!out_buf->output_blocked_callback(m->output_multiple())) {
+ LOG(std::ostringstream msg;
+ msg << m << " -- BLKD_OUT -- ([1] callback FAILED)";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+ return BLKD_OUT;
+ } else {
+ LOG(std::ostringstream msg;
+ msg << m << " -- BLKD_OUT -- ([1] try again idx: " << output_idx
+ << ")";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+ goto out_try_again;
+ }
+ } else {
+ return BLKD_OUT;
+ }
}
goto setup_call_to_work; // jump to common code
@@ -278,14 +329,14 @@ block_executor::state block_executor::run_one_iteration()
d_input_done.resize(d->ninputs());
d_output_items.resize(0);
d_start_nitems_read.resize(d->ninputs());
- LOG(GR_LOG_INFO(d_debug_logger, "sink"););
+ // LOG(GR_LOG_INFO(d_debug_logger, "sink"););
+ LOG(std::ostringstream msg; msg << m << " -- sink";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
max_items_avail = 0;
for (int i = 0; i < d->ninputs(); i++) {
{
- /*
- * Acquire the mutex and grab local copies of items_available and done.
- */
+ // Acquire the mutex and grab local copies of items_available and done.
buffer_reader_sptr in_buf = d->input(i);
gr::thread::scoped_lock guard(*in_buf->mutex());
d_ninput_items[i] = in_buf->items_available();
@@ -293,10 +344,10 @@ block_executor::state block_executor::run_one_iteration()
}
LOG(std::ostringstream msg;
- msg << "d_ninput_items[" << i << "] = " << d_ninput_items[i];
+ msg << m << " -- d_ninput_items[" << i << "] = " << d_ninput_items[i];
GR_LOG_INFO(d_debug_logger, msg.str()););
LOG(std::ostringstream msg;
- msg << "d_input_done[" << i << "] = " << d_input_done[i];
+ msg << m << " -- d_input_done[" << i << "] = " << d_input_done[i];
GR_LOG_INFO(d_debug_logger, msg.str()););
if (d_ninput_items[i] < m->output_multiple() && d_input_done[i])
@@ -309,13 +360,16 @@ block_executor::state block_executor::run_one_iteration()
noutput_items = (int)(max_items_avail * m->relative_rate());
noutput_items = round_down(noutput_items, m->output_multiple());
noutput_items = std::min(noutput_items, max_noutput_items);
- LOG(std::ostringstream msg; msg << "max_items_avail = " << max_items_avail;
+ LOG(std::ostringstream msg;
+ msg << m << " -- max_items_avail = " << max_items_avail;
GR_LOG_INFO(d_debug_logger, msg.str()););
- LOG(std::ostringstream msg; msg << "noutput_items = " << noutput_items;
+ LOG(std::ostringstream msg; msg << m << " -- noutput_items = " << noutput_items;
GR_LOG_INFO(d_debug_logger, msg.str()););
if (noutput_items == 0) { // we're blocked on input
- LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
+ // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_IN";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
return BLKD_IN;
}
@@ -331,12 +385,11 @@ block_executor::state block_executor::run_one_iteration()
d_output_items.resize(d->noutputs());
d_start_nitems_read.resize(d->ninputs());
+ blkd_in_try_again:
max_items_avail = 0;
for (int i = 0; i < d->ninputs(); i++) {
{
- /*
- * Acquire the mutex and grab local copies of items_available and done.
- */
+ // Acquire the mutex and grab local copies of items_available and done.
buffer_reader_sptr in_buf = d->input(i);
gr::thread::scoped_lock guard(*in_buf->mutex());
d_ninput_items[i] = in_buf->items_available();
@@ -346,11 +399,13 @@ block_executor::state block_executor::run_one_iteration()
}
// determine the minimum available output space
- noutput_items =
- min_available_space(d, m->output_multiple(), m->min_noutput_items());
+ output_idx = 0;
+ out_try_again2:
+ noutput_items = min_available_space(
+ m, d, m->output_multiple(), m->min_noutput_items(), output_idx);
if (ENABLE_LOGGING) {
std::ostringstream msg;
- msg << "regular ";
+ msg << m << " -- regular ";
msg << m->relative_rate_i() << ":" << m->relative_rate_d();
msg << " max_items_avail = " << max_items_avail;
msg << " noutput_items = " << noutput_items;
@@ -360,13 +415,36 @@ block_executor::state block_executor::run_one_iteration()
goto were_done;
if (noutput_items == 0) { // we're output blocked
- LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
- return BLKD_OUT;
+ // LOG(GR_LOG_INFO(d_debug_logger, "BLKD_OUT"););
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_OUT";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
+
+ buffer_sptr out_buf = d->output(output_idx);
+
+ if (out_buf->output_blkd_cb_ready(m->output_multiple())) {
+ // Call the output blocked callback which will tell us if it was
+ // able to unblock the output
+ gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
+ if (!out_buf->output_blocked_callback(m->output_multiple())) {
+ LOG(std::ostringstream msg;
+ msg << m << " -- BLKD_OUT -- ([2] callback FAILED)";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+ return BLKD_OUT;
+ } else {
+ LOG(std::ostringstream msg;
+ msg << m << " -- BLKD_OUT -- ([2] try again idx: " << output_idx
+ << ")";
+ GR_LOG_INFO(d_debug_logger, msg.str()););
+ goto out_try_again2;
+ }
+ } else {
+ return BLKD_OUT;
+ }
}
try_again:
if (m->fixed_rate()) {
- // try to work it forward starting with max_items_avail.
+ // Try to work it forward starting with max_items_avail.
// We want to try to consume all the input we've got.
int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
@@ -416,6 +494,10 @@ block_executor::state block_executor::run_one_iteration()
// ask the block how much input they need to produce noutput_items
m->forecast(noutput_items, d_ninput_items_required);
+ LOG(std::ostringstream msg;
+ msg << m << " -- FCAST noutput_items=" << noutput_items << " inputs_required="
+ << d_ninput_items_required[0] << " inputs_avail=" << d_ninput_items[0];
+ GR_LOG_INFO(d_debug_logger, msg.str()));
// See if we've got sufficient input available and make sure we
// didn't overflow on the input.
@@ -445,12 +527,29 @@ block_executor::state block_executor::run_one_iteration()
}
// We're blocked on input
- LOG(GR_LOG_INFO(d_debug_logger, "BLKD_IN"););
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_IN";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
+
+ buffer_reader_sptr in_buf = d->input(i);
+
+ LOG(std::ostringstream msg;
+ msg << m << " (t: " << this << ") -- pre-callback";
+ GR_LOG_DEBUG(d_debug_logger, msg.str()));
+
+ if (in_buf->input_blkd_cb_ready(d_ninput_items_required[i])) {
+ gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer());
+ if (in_buf->input_blocked_callback(d_ninput_items_required[i],
+ d_ninput_items[i])) {
+ LOG(std::ostringstream msg; msg << m << " -- BLKD_IN -- TRY AGAIN";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
+ goto blkd_in_try_again;
+ }
+ }
+
if (d_input_done[i]) // If the upstream block is done, we're done
goto were_done;
// Is it possible to ever fulfill this request?
- buffer_reader_sptr in_buf = d->input(i);
if (d_ninput_items_required[i] > in_buf->max_possible_items_available()) {
// Nope, never going to happen...
std::ostringstream msg;
@@ -476,14 +575,18 @@ block_executor::state block_executor::run_one_iteration()
// We've got enough data on each input to produce noutput_items.
// Finish setting up the call to work.
- for (int i = 0; i < d->ninputs(); i++)
+ for (int i = 0; i < d->ninputs(); i++) {
+ d->input(i)->buffer()->increment_active();
d_input_items[i] = d->input(i)->read_pointer();
+ }
setup_call_to_work:
d->d_produce_or = 0;
- for (int i = 0; i < d->noutputs(); i++)
+ for (int i = 0; i < d->noutputs(); i++) {
+ d->output(i)->increment_active();
d_output_items[i] = d->output(i)->write_pointer();
+ }
// determine where to start looking for new tags
for (int i = 0; i < d->ninputs(); i++)
@@ -504,7 +607,10 @@ block_executor::state block_executor::run_one_iteration()
#endif /* GR_PERFORMANCE_COUNTERS */
LOG(std::ostringstream msg;
- msg << "general_work: noutput_items = " << noutput_items << " result = " << n;
+ msg << m << " -- general_work: noutput_items = " << noutput_items
+ << " ninput_items=" << (d->ninputs() >= 1 ? d_ninput_items[0] : 0)
+ << " ninput_req=" << (d->ninputs() >= 1 ? d_ninput_items_required[0] : 0)
+ << " result = " << n;
GR_LOG_INFO(d_debug_logger, msg.str()););
// Adjust number of unaligned items left to process
@@ -521,14 +627,21 @@ block_executor::state block_executor::run_one_iteration()
m->mp_relative_rate(),
m->update_rate(),
d_returned_tags,
- m->unique_id()))
+ m->unique_id())) {
+ d->post_work_cleanup();
goto were_done;
+ }
- if (n == block::WORK_DONE)
+ if (n == block::WORK_DONE) {
+ d->post_work_cleanup();
goto were_done;
+ }
- if (n != block::WORK_CALLED_PRODUCE)
+ if (n != block::WORK_CALLED_PRODUCE) {
d->produce_each(n); // advance write pointers
+ }
+
+ d->post_work_cleanup();
// For some blocks that can change their produce/consume ratio
// (the relative_rate), we might want to automatically update
@@ -558,13 +671,30 @@ block_executor::state block_executor::run_one_iteration()
}
*/
+ // The call to general_work() produced no output therefore the block may
+ // be "effectively output blocked". Call the output blocked callback
+ // just in case, it can do no harm.
+ for (int i = 0; i < d->noutputs(); i++) {
+ buffer_sptr out_buf = d->output(i);
+ LOG(std::ostringstream msg;
+ msg << m << " -- NO OUTPUT -- [" << i << "] -- OUTPUT BLOCKED";
+ GR_LOG_DEBUG(d_debug_logger, msg.str()););
+ gr::custom_lock lock(std::ref(*out_buf->mutex()), out_buf);
+ out_buf->output_blocked_callback(m->output_multiple(), true);
+ LOG(std::ostringstream msg; msg << m << " -- NO OUTPUT -- [" << i
+ << "] -- OUTPUT BLOCKED CBACK: " << rc;
+ GR_LOG_DEBUG(d_debug_logger, msg.str()););
+ }
+
// Have the caller try again...
return READY_NO_OUTPUT;
}
GR_LOG_ERROR(d_logger, "invalid state while going through iteration state machine");
were_done:
- LOG(GR_LOG_INFO(d_debug_logger, "we're done"););
+ // LOG(GR_LOG_INFO(d_debug_logger, "we're done"););
+ LOG(std::ostringstream msg; msg << m << " -- we're done";
+ GR_LOG_INFO(d_debug_logger, msg.str()));
d->set_done(true);
return DONE;
}
diff --git a/gnuradio-runtime/lib/buffer.cc b/gnuradio-runtime/lib/buffer.cc
index cb6b949932..7fd0a39579 100644
--- a/gnuradio-runtime/lib/buffer.cc
+++ b/gnuradio-runtime/lib/buffer.cc
@@ -13,6 +13,10 @@
#endif
#include "vmcircbuf.h"
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_double_mapped.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/buffer_single_mapped.h>
+#include <gnuradio/buffer_type.h>
#include <gnuradio/integer_math.h>
#include <gnuradio/math.h>
#include <boost/format.hpp>
@@ -23,7 +27,6 @@
namespace gr {
static long s_buffer_count = 0; // counts for debugging storage mgmt
-static long s_buffer_reader_count = 0;
/* ----------------------------------------------------------------------------
Notes on storage management
@@ -52,117 +55,81 @@ static long s_buffer_reader_count = 0;
gr::buffer_reader goes to zero, we can successfully reclaim it.
---------------------------------------------------------------------------- */
-/*
- * Compute the minimum number of buffer items that work (i.e.,
- * address space wrap-around works). To work is to satisfy this
- * constraint for integer buffer_size and k:
- *
- * type_size * nitems == k * page_size
- */
-static inline long minimum_buffer_items(long type_size, long page_size)
-{
- return page_size / GR_GCD(type_size, page_size);
-}
-
-buffer::buffer(int nitems, size_t sizeof_item, block_sptr link)
+buffer::buffer(BufferMappingType buf_type,
+ int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link)
: d_base(0),
d_bufsize(0),
+ d_buf_map_type(buf_type),
d_max_reader_delay(0),
+ d_max_reader_history(1),
+ d_has_history(false),
d_sizeof_item(sizeof_item),
d_link(link),
d_write_index(0),
d_abs_write_offset(0),
d_done(false),
- d_last_min_items_read(0)
+ d_last_min_items_read(0),
+ d_callback_flag(false),
+ d_active_pointer_counter(0),
+ d_downstream_lcm_nitems(downstream_lcm_nitems),
+ d_write_multiple(0)
{
gr::configure_default_loggers(d_logger, d_debug_logger, "buffer");
- if (!allocate_buffer(nitems, sizeof_item))
- throw std::bad_alloc();
s_buffer_count++;
}
-buffer_sptr make_buffer(int nitems, size_t sizeof_item, block_sptr link)
+buffer_sptr make_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link,
+ block_sptr buf_owner)
{
- return buffer_sptr(new buffer(nitems, sizeof_item, link));
-}
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ gr::logger_ptr logger;
+ gr::logger_ptr debug_logger;
+ gr::configure_default_loggers(logger, debug_logger, "make_buffer");
+ std::ostringstream msg;
+#endif
-buffer::~buffer()
-{
- assert(d_readers.size() == 0);
- s_buffer_count--;
-}
+#if DEBUG_SINGLE_MAPPED
+ if (1) {
+#else
+ if (buf_owner->get_buffer_type() != buftype_DEFAULT_NON_CUSTOM::get()) {
+#endif
+ // Buffer type is NOT the default non custom variety so allocate a
+ // buffer_single_mapped instance
+#ifdef BUFFER_DEBUG
+ msg << "buffer_single_mapped nitems: " << nitems
+ << " -- sizeof_item: " << sizeof_item;
+ GR_LOG_DEBUG(logger, msg.str());
+#endif
-/*!
- * sets d_vmcircbuf, d_base, d_bufsize.
- * returns true iff successful.
- */
-bool buffer::allocate_buffer(int nitems, size_t sizeof_item)
-{
- int orig_nitems = nitems;
-
- // Any buffersize we come up with must be a multiple of min_nitems.
- int granularity = gr::vmcircbuf_sysconfig::granularity();
- int min_nitems = minimum_buffer_items(sizeof_item, granularity);
-
- // Round-up nitems to a multiple of min_nitems.
- if (nitems % min_nitems != 0)
- nitems = ((nitems / min_nitems) + 1) * min_nitems;
-
- // If we rounded-up a whole bunch, give the user a heads up.
- // This only happens if sizeof_item is not a power of two.
-
- if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) {
- auto msg =
- str(boost::format(
- "allocate_buffer: tried to allocate"
- " %d items of size %d. Due to alignment requirements"
- " %d were allocated. If this isn't OK, consider padding"
- " your structure to a power-of-two bytes."
- " On this platform, our allocation granularity is %d bytes.") %
- orig_nitems % sizeof_item % nitems % granularity);
- GR_LOG_WARN(d_logger, msg.c_str());
- }
+ return buffer_sptr(new buffer_single_mapped(
+ nitems, sizeof_item, downstream_lcm_nitems, link, buf_owner));
- d_bufsize = nitems;
- d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item));
- if (d_vmcircbuf == 0) {
- std::ostringstream msg;
- msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size "
- << d_bufsize * d_sizeof_item / 1024 << " KB";
- GR_LOG_ERROR(d_logger, msg.str());
- return false;
- }
+ } else {
+ // Default to allocating a buffer_double_mapped instance
+#ifdef BUFFER_DEBUG
+ msg << "buffer_double_mapped nitems: " << nitems
+ << " -- sizeof_item: " << sizeof_item;
+ GR_LOG_DEBUG(logger, msg.str());
+#endif
- d_base = (char*)d_vmcircbuf->pointer_to_first_copy();
- return true;
+ return buffer_sptr(
+ new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
+ }
}
-int buffer::space_available()
+buffer::~buffer()
{
- if (d_readers.empty())
- return d_bufsize - 1; // See comment below
-
- else {
- // Find out the maximum amount of data available to our readers
-
- int most_data = d_readers[0]->items_available();
- uint64_t min_items_read = d_readers[0]->nitems_read();
- for (size_t i = 1; i < d_readers.size(); i++) {
- most_data = std::max(most_data, d_readers[i]->items_available());
- min_items_read = std::min(min_items_read, d_readers[i]->nitems_read());
- }
-
- if (min_items_read != d_last_min_items_read) {
- prune_tags(d_last_min_items_read);
- d_last_min_items_read = min_items_read;
- }
-
- // The -1 ensures that the case d_write_index == d_read_index is
- // unambiguous. It indicates that there is no data for the reader
- return d_bufsize - most_data - 1;
- }
+ assert(d_readers.size() == 0);
+ s_buffer_count--;
}
void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
@@ -170,8 +137,22 @@ void* buffer::write_pointer() { return &d_base[d_write_index * d_sizeof_item]; }
void buffer::update_write_pointer(int nitems)
{
gr::thread::scoped_lock guard(*mutex());
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ unsigned orig_wr_idx = d_write_index;
+#endif
+
d_write_index = index_add(d_write_index, nitems);
d_abs_write_offset += nitems;
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] update_write_pointer -- orig d_write_index: " << orig_wr_idx
+ << " -- nitems: " << nitems << " -- d_write_index: " << d_write_index;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
}
void buffer::set_done(bool done)
@@ -180,20 +161,6 @@ void buffer::set_done(bool done)
d_done = done;
}
-buffer_reader_sptr
-buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay)
-{
- if (nzero_preload < 0)
- throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0");
-
- buffer_reader_sptr r(
- new buffer_reader(buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
- r->declare_sample_delay(delay);
- buf->d_readers.push_back(r.get());
-
- return r;
-}
-
void buffer::drop_reader(buffer_reader* reader)
{
std::vector<buffer_reader*>::iterator result =
@@ -263,92 +230,40 @@ void buffer::prune_tags(uint64_t max_time)
}
}
-long buffer_ncurrently_allocated() { return s_buffer_count; }
-
-// ----------------------------------------------------------------------------
-
-buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link)
- : d_buffer(buffer),
- d_read_index(read_index),
- d_abs_read_offset(0),
- d_link(link),
- d_attr_delay(0)
-{
- s_buffer_reader_count++;
-}
-
-buffer_reader::~buffer_reader()
+void buffer::on_lock(gr::thread::scoped_lock& lock)
{
- d_buffer->drop_reader(this);
- s_buffer_reader_count--;
+ // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object
+
+ // Wait until no other callback is active and no pointers are active for
+ // the buffer, then mark the callback flag active.
+ d_cv.wait(lock, [this]() {
+ return (d_callback_flag == false && d_active_pointer_counter == 0);
+ });
+ d_callback_flag = true;
}
-void buffer_reader::declare_sample_delay(unsigned delay)
+void buffer::on_unlock()
{
- d_attr_delay = delay;
- d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay);
-}
-
-unsigned buffer_reader::sample_delay() const { return d_attr_delay; }
-
-int buffer_reader::items_available() const
-{
- return d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
-}
+ // NOTE: the protecting mutex (scoped_lock) is held by the custom_lock object
-const void* buffer_reader::read_pointer()
-{
- return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item];
+ // Mark the callback flag inactive and notify anyone waiting
+ d_callback_flag = false;
+ d_cv.notify_all();
}
-void buffer_reader::update_read_pointer(int nitems)
-{
- gr::thread::scoped_lock guard(*mutex());
- d_read_index = d_buffer->index_add(d_read_index, nitems);
- d_abs_read_offset += nitems;
-}
+long buffer_ncurrently_allocated() { return s_buffer_count; }
-void buffer_reader::get_tags_in_range(std::vector<tag_t>& v,
- uint64_t abs_start,
- uint64_t abs_end,
- long id)
+std::ostream& operator<<(std::ostream& os, const buffer& buf)
{
- gr::thread::scoped_lock guard(*mutex());
-
- uint64_t lower_bound = abs_start - d_attr_delay;
- // check for underflow and if so saturate at 0
- if (lower_bound > abs_start)
- lower_bound = 0;
- uint64_t upper_bound = abs_end - d_attr_delay;
- // check for underflow and if so saturate at 0
- if (upper_bound > abs_end)
- upper_bound = 0;
-
- v.clear();
- std::multimap<uint64_t, tag_t>::iterator itr =
- d_buffer->get_tags_lower_bound(lower_bound);
- std::multimap<uint64_t, tag_t>::iterator itr_end =
- d_buffer->get_tags_upper_bound(upper_bound);
-
- uint64_t item_time;
- while (itr != itr_end) {
- item_time = (*itr).second.offset + d_attr_delay;
- if ((item_time >= abs_start) && (item_time < abs_end)) {
- std::vector<long>::iterator id_itr;
- id_itr = std::find(
- itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id);
- // If id is not in the vector of marked blocks
- if (id_itr == itr->second.marked_deleted.end()) {
- tag_t t = (*itr).second;
- t.offset += d_attr_delay;
- v.push_back(t);
- v.back().marked_deleted.clear();
- }
- }
- itr++;
+ os << std::endl
+ << " sz: " << buf.d_bufsize << std::endl
+ << " nrdrs: " << buf.d_readers.size() << std::endl;
+ for (auto& rdr : buf.d_readers) {
+ os << " rd_idx: " << rdr->get_read_index() << std::endl
+ << " abs_rd_offset: " << rdr->get_abs_read_offset() << std::endl
+ << std::endl;
}
+ return os;
}
-long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; }
-
} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_double_mapped.cc b/gnuradio-runtime/lib/buffer_double_mapped.cc
new file mode 100644
index 0000000000..ad99c2162b
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_double_mapped.cc
@@ -0,0 +1,155 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include "vmcircbuf.h"
+#include <gnuradio/block.h>
+#include <gnuradio/buffer_double_mapped.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/integer_math.h>
+#include <gnuradio/math.h>
+#include <assert.h>
+#include <algorithm>
+#include <iostream>
+#include <stdexcept>
+
+namespace gr {
+
+/*
+ * Compute the minimum number of buffer items that work (i.e.,
+ * address space wrap-around works). To work is to satisfy this
+ * constraint for integer buffer_size and k:
+ *
+ * type_size * nitems == k * page_size
+ */
+static inline long minimum_buffer_items(long type_size, long page_size)
+{
+ return page_size / GR_GCD(type_size, page_size);
+}
+
+
+buffer_double_mapped::buffer_double_mapped(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link)
+ : buffer(BufferMappingType::DoubleMapped,
+ nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ link)
+{
+ gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_double_mapped");
+ if (!allocate_buffer(nitems, sizeof_item))
+ throw std::bad_alloc();
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ {
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "buffer_double_mapped constructor -- history: " << link->history();
+ GR_LOG_DEBUG(d_logger, msg.str());
+ }
+#endif
+}
+
+buffer_sptr make_buffer_double_mapped(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link)
+{
+ return buffer_sptr(
+ new buffer_double_mapped(nitems, sizeof_item, downstream_lcm_nitems, link));
+}
+
+buffer_double_mapped::~buffer_double_mapped() {}
+
+/*!
+ * sets d_vmcircbuf, d_base, d_bufsize.
+ * returns true iff successful.
+ */
+bool buffer_double_mapped::allocate_buffer(int nitems, size_t sizeof_item)
+{
+ int orig_nitems = nitems;
+
+ // Any buffer size we come up with must be a multiple of min_nitems.
+ int granularity = gr::vmcircbuf_sysconfig::granularity();
+ int min_nitems = minimum_buffer_items(sizeof_item, granularity);
+
+ // Round-up nitems to a multiple of min_nitems.
+ if (nitems % min_nitems != 0)
+ nitems = ((nitems / min_nitems) + 1) * min_nitems;
+
+ // If we rounded-up a whole bunch, give the user a heads up.
+ // This only happens if sizeof_item is not a power of two.
+ if (nitems > 2 * orig_nitems && nitems * (int)sizeof_item > granularity) {
+ auto msg =
+ str(boost::format(
+ "allocate_buffer: tried to allocate"
+ " %d items of size %d. Due to alignment requirements"
+ " %d were allocated. If this isn't OK, consider padding"
+ " your structure to a power-of-two bytes."
+ " On this platform, our allocation granularity is %d bytes.") %
+ orig_nitems % sizeof_item % nitems % granularity);
+ GR_LOG_WARN(d_logger, msg.c_str());
+ }
+
+ d_bufsize = nitems;
+ d_vmcircbuf.reset(gr::vmcircbuf_sysconfig::make(d_bufsize * d_sizeof_item));
+ if (d_vmcircbuf == 0) {
+ std::ostringstream msg;
+ msg << "gr::buffer::allocate_buffer: failed to allocate buffer of size "
+ << d_bufsize * d_sizeof_item / 1024 << " KB";
+ GR_LOG_ERROR(d_logger, msg.str());
+ return false;
+ }
+
+ d_base = (char*)d_vmcircbuf->pointer_to_first_copy();
+ return true;
+}
+
+int buffer_double_mapped::space_available()
+{
+ if (d_readers.empty())
+ return d_bufsize - 1; // See comment below
+
+ else {
+
+ // Find out the maximum amount of data available to our readers
+ int most_data = d_readers[0]->items_available();
+ uint64_t min_items_read = d_readers[0]->nitems_read();
+ for (size_t i = 1; i < d_readers.size(); i++) {
+ most_data = std::max(most_data, d_readers[i]->items_available());
+ min_items_read = std::min(min_items_read, d_readers[i]->nitems_read());
+ }
+
+ if (min_items_read != d_last_min_items_read) {
+ prune_tags(d_last_min_items_read);
+ d_last_min_items_read = min_items_read;
+ }
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "space_available() called d_write_index: " << d_write_index
+ << " -- space_available: " << (d_bufsize - most_data - 1);
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // The -1 ensures that the case d_write_index == d_read_index is
+ // unambiguous. It indicates that there is no data for the reader
+ return d_bufsize - most_data - 1;
+ }
+}
+
+} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_reader.cc b/gnuradio-runtime/lib/buffer_reader.cc
new file mode 100644
index 0000000000..7ead53032e
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_reader.cc
@@ -0,0 +1,180 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <gnuradio/block.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/buffer_reader_sm.h>
+#include <gnuradio/integer_math.h>
+#include <gnuradio/math.h>
+#include <assert.h>
+#include <algorithm>
+#include <iostream>
+#include <stdexcept>
+
+namespace gr {
+
+static long s_buffer_reader_count = 0;
+
+buffer_reader_sptr
+buffer_add_reader(buffer_sptr buf, int nzero_preload, block_sptr link, int delay)
+{
+ if (nzero_preload < 0)
+ throw std::invalid_argument("buffer_add_reader: nzero_preload must be >= 0");
+
+ buffer_reader_sptr r;
+
+ if (buf->get_mapping_type() == BufferMappingType::DoubleMapped) {
+ r.reset(new buffer_reader(
+ buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
+ r->declare_sample_delay(delay);
+ } else if (buf->get_mapping_type() == BufferMappingType::SingleMapped) {
+ r.reset(new buffer_reader_sm(
+ buf, buf->index_sub(buf->d_write_index, nzero_preload), link));
+ r->declare_sample_delay(delay);
+
+ // Update reader block history
+ buf->update_reader_block_history(link->history(), delay);
+ r->d_read_index = buf->d_write_index - nzero_preload;
+ }
+
+ buf->d_readers.push_back(r.get());
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ std::cerr << " [" << buf.get() << ";" << r.get()
+ << "] buffer_add_reader() nzero_preload " << nzero_preload
+ << " -- delay: " << delay << " -- history: " << link->history()
+ << " -- RD_idx: " << r->d_read_index << std::endl;
+#endif
+
+ return r;
+}
+
+buffer_reader::buffer_reader(buffer_sptr buffer, unsigned int read_index, block_sptr link)
+ : d_buffer(buffer),
+ d_read_index(read_index),
+ d_abs_read_offset(0),
+ d_link(link),
+ d_attr_delay(0)
+{
+#ifdef BUFFER_DEBUG
+ gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_reader");
+#endif
+
+ s_buffer_reader_count++;
+}
+
+buffer_reader::~buffer_reader()
+{
+ d_buffer->drop_reader(this);
+ s_buffer_reader_count--;
+}
+
+void buffer_reader::declare_sample_delay(unsigned delay)
+{
+ d_attr_delay = delay;
+ d_buffer->d_max_reader_delay = std::max(d_attr_delay, d_buffer->d_max_reader_delay);
+}
+
+unsigned buffer_reader::sample_delay() const { return d_attr_delay; }
+
+int buffer_reader::items_available() // const
+{
+ int available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ std::ostringstream msg;
+ msg << "[" << d_buffer << ";" << this << "] "
+ << "items_available() WR_idx: " << d_buffer->d_write_index
+ << " -- WR items: " << d_buffer->nitems_written()
+ << " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-"
+ << d_attr_delay << ") -- available: " << available;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ return available;
+}
+
+const void* buffer_reader::read_pointer()
+{
+ return &d_buffer->d_base[d_read_index * d_buffer->d_sizeof_item];
+}
+
+void buffer_reader::update_read_pointer(int nitems)
+{
+ gr::thread::scoped_lock guard(*mutex());
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ unsigned orig_rd_idx = d_read_index;
+#endif
+
+ d_read_index = d_buffer->index_add(d_read_index, nitems);
+ d_abs_read_offset += nitems;
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ std::ostringstream msg;
+ msg << "[" << d_buffer << ";" << this
+ << "] update_read_pointer -- orig d_read_index: " << orig_rd_idx
+ << " -- nitems: " << nitems << " -- d_read_index: " << d_read_index;
+ GR_LOG_DEBUG(d_buffer->d_logger, msg.str());
+#endif
+}
+
+void buffer_reader::get_tags_in_range(std::vector<tag_t>& v,
+ uint64_t abs_start,
+ uint64_t abs_end,
+ long id)
+{
+ gr::thread::scoped_lock guard(*mutex());
+
+ uint64_t lower_bound = abs_start - d_attr_delay;
+ // check for underflow and if so saturate at 0
+ if (lower_bound > abs_start)
+ lower_bound = 0;
+ uint64_t upper_bound = abs_end - d_attr_delay;
+ // check for underflow and if so saturate at 0
+ if (upper_bound > abs_end)
+ upper_bound = 0;
+
+ v.clear();
+ std::multimap<uint64_t, tag_t>::iterator itr =
+ d_buffer->get_tags_lower_bound(lower_bound);
+ std::multimap<uint64_t, tag_t>::iterator itr_end =
+ d_buffer->get_tags_upper_bound(upper_bound);
+
+ uint64_t item_time;
+ while (itr != itr_end) {
+ item_time = (*itr).second.offset + d_attr_delay;
+ if ((item_time >= abs_start) && (item_time < abs_end)) {
+ std::vector<long>::iterator id_itr;
+ id_itr = std::find(
+ itr->second.marked_deleted.begin(), itr->second.marked_deleted.end(), id);
+ // If id is not in the vector of marked blocks
+ if (id_itr == itr->second.marked_deleted.end()) {
+ tag_t t = (*itr).second;
+ t.offset += d_attr_delay;
+ v.push_back(t);
+ v.back().marked_deleted.clear();
+ }
+ }
+ itr++;
+ }
+}
+
+long buffer_reader_ncurrently_allocated() { return s_buffer_reader_count; }
+
+} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_reader_sm.cc b/gnuradio-runtime/lib/buffer_reader_sm.cc
new file mode 100644
index 0000000000..9e0fac584f
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_reader_sm.cc
@@ -0,0 +1,149 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <gnuradio/block.h>
+#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader_sm.h>
+#include <gnuradio/integer_math.h>
+#include <gnuradio/math.h>
+#include <assert.h>
+#include <algorithm>
+#include <iostream>
+#include <limits>
+#include <stdexcept>
+
+namespace gr {
+
+buffer_reader_sm::~buffer_reader_sm() {}
+
+int buffer_reader_sm::items_available()
+{
+ int available = 0;
+
+ if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+ if (d_buffer->d_write_index == d_read_index) {
+ if ((nitems_read() - sample_delay()) != d_buffer->nitems_written()) {
+ available = d_buffer->d_bufsize - d_read_index;
+ }
+ } else {
+ available = d_buffer->index_sub(d_buffer->d_write_index, d_read_index);
+ }
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << d_buffer << ";" << this << "] "
+ << "items_available() WR_idx: " << d_buffer->d_write_index
+ << " -- WR items: " << d_buffer->nitems_written()
+ << " -- RD_idx: " << d_read_index << " -- RD items: " << nitems_read() << " (-"
+ << d_attr_delay << ") -- available: " << available;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ return available;
+}
+
+bool buffer_reader_sm::input_blkd_cb_ready(int items_required) const
+{
+ gr::thread::scoped_lock(*d_buffer->mutex());
+
+ return (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
+ (d_buffer->d_write_index < d_read_index));
+}
+
+bool buffer_reader_sm::input_blocked_callback(int items_required, int items_avail)
+{
+ // Maybe adjust read pointers from min read index?
+ // This would mean that *all* readers must be > (passed) the write index
+ if (((d_buffer->d_bufsize - d_read_index) < (uint32_t)items_required) &&
+ (d_buffer->d_write_index < d_read_index)) {
+
+ // Update items available before going farther as it could be stale
+ items_avail = items_available();
+
+ // Find reader with the smallest read index that is greater than the
+ // write index
+ uint32_t min_reader_index = std::numeric_limits<uint32_t>::max();
+ uint32_t min_read_idx = std::numeric_limits<uint32_t>::max();
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (d_buffer->d_readers[idx]->d_read_index > d_buffer->d_write_index) {
+ // Record index of reader with minimum read-index
+ if (d_buffer->d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_buffer->d_readers[idx]->d_read_index;
+ min_reader_index = idx;
+ }
+ }
+ }
+
+ // Note items_avail might be zero, that's okay.
+ items_avail += d_read_index - min_read_idx;
+ int gap = min_read_idx - d_buffer->d_write_index;
+ if (items_avail > gap) {
+ return false;
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << d_buffer << ";" << this << "] "
+ << "input_blocked_callback() WR_idx: " << d_buffer->d_write_index
+ << " -- WR items: " << d_buffer->nitems_written()
+ << " -- BUFSIZE: " << d_buffer->d_bufsize << " -- RD_idx: " << min_read_idx;
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (idx != min_reader_index) {
+ msg << " -- OTHER_RDR: " << d_buffer->d_readers[idx]->d_read_index;
+ }
+ }
+
+ msg << " -- GAP: " << gap << " -- items_required: " << items_required
+ << " -- items_avail: " << items_avail;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Shift existing data down to make room for blocked data at end of buffer
+ uint32_t move_data_size = d_buffer->d_write_index * d_buffer->d_sizeof_item;
+ char* dest = d_buffer->d_base + (items_avail * d_buffer->d_sizeof_item);
+ std::memmove(dest, d_buffer->d_base, move_data_size);
+
+ // Next copy the data from the end of the buffer back to the beginning
+ uint32_t avail_data_size = items_avail * d_buffer->d_sizeof_item;
+ char* src = d_buffer->d_base + (min_read_idx * d_buffer->d_sizeof_item);
+ std::memcpy(d_buffer->d_base, src, avail_data_size);
+
+ // Now adjust write pointer
+ d_buffer->d_write_index += items_avail;
+
+ // Finally adjust all reader pointers
+ for (size_t idx = 0; idx < d_buffer->d_readers.size(); ++idx) {
+ if (idx == min_reader_index) {
+ d_buffer->d_readers[idx]->d_read_index = 0;
+ } else {
+ d_buffer->d_readers[idx]->d_read_index += items_avail;
+ d_buffer->d_readers[idx]->d_read_index %= d_buffer->d_bufsize;
+ }
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+buffer_reader_sm::buffer_reader_sm(buffer_sptr buffer,
+ unsigned int read_index,
+ block_sptr link)
+ : buffer_reader(buffer, read_index, link)
+{
+}
+
+
+} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_single_mapped.cc b/gnuradio-runtime/lib/buffer_single_mapped.cc
new file mode 100644
index 0000000000..6024396557
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_single_mapped.cc
@@ -0,0 +1,297 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+#include <gnuradio/block.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/buffer_single_mapped.h>
+#include <gnuradio/integer_math.h>
+#include <gnuradio/math.h>
+#include <gnuradio/thread/thread.h>
+#include <assert.h>
+#include <algorithm>
+#include <iostream>
+#include <stdexcept>
+
+namespace gr {
+
+buffer_single_mapped::buffer_single_mapped(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems,
+ block_sptr link,
+ block_sptr buf_owner)
+ : buffer(BufferMappingType::SingleMapped,
+ nitems,
+ sizeof_item,
+ downstream_lcm_nitems,
+ link),
+ d_buf_owner(buf_owner),
+ d_buffer(nullptr,
+ std::bind(&buffer_single_mapped::deleter, this, std::placeholders::_1))
+{
+ gr::configure_default_loggers(d_logger, d_debug_logger, "buffer_single_mapped");
+ if (!allocate_buffer(nitems, sizeof_item, downstream_lcm_nitems))
+ throw std::bad_alloc();
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ {
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "buffer_single_mapped constructor -- history: " << link->history();
+ GR_LOG_DEBUG(d_logger, msg.str());
+ }
+#endif
+}
+
+buffer_single_mapped::~buffer_single_mapped() {}
+
+/*!
+ * Allocates underlying buffer.
+ * returns true iff successful.
+ */
+bool buffer_single_mapped::allocate_buffer(int nitems,
+ size_t sizeof_item,
+ uint64_t downstream_lcm_nitems)
+{
+#ifdef BUFFER_DEBUG
+ int orig_nitems = nitems;
+#endif
+
+ // Unlike the double mapped buffer case that can easily wrap back onto itself
+ // for both reads and writes the single mapped case needs to be aware of read
+ // and write granularity and size the underlying buffer accordingly. Otherwise
+ // the calls to space_available() and items_available() may return values that
+ // are too small and the scheduler will get stuck.
+ uint64_t write_granularity = 1;
+
+ if (link()->fixed_rate()) {
+ // Fixed rate
+ int num_inputs =
+ link()->fixed_rate_noutput_to_ninput(1) - (link()->history() - 1);
+ write_granularity =
+ link()->fixed_rate_ninput_to_noutput(num_inputs + (link()->history() - 1));
+ }
+
+ if (link()->relative_rate() != 1.0) {
+ // Some blocks say they have fixed rate but actually have a relative
+ // rate set (looking at you puncture_bb...) so make this a separate
+ // check.
+
+ // Relative rate
+ write_granularity = link()->relative_rate_i();
+ }
+
+ // If the output multiple has been set explicitly then adjust the write
+ // granularity.
+ if (link()->output_multiple_set()) {
+ write_granularity =
+ GR_LCM(write_granularity, (uint64_t)link()->output_multiple());
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "WRITE GRANULARITY: " << write_granularity;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Adjust size so output buffer size is a multiple of the write granularity
+ if (write_granularity != 1 || downstream_lcm_nitems != 1) {
+ uint64_t size_align_adjust = GR_LCM(write_granularity, downstream_lcm_nitems);
+ uint64_t remainder = nitems % size_align_adjust;
+ nitems += (remainder > 0) ? (size_align_adjust - remainder) : 0;
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "allocate_buffer()** called nitems: " << orig_nitems
+ << " -- read_multiple: " << downstream_lcm_nitems
+ << " -- write_multiple: " << write_granularity
+ << " -- NEW nitems: " << nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+ }
+
+ // Allocate a new custom buffer from the owning block
+ char* buf = buf_owner()->allocate_custom_buffer(nitems * sizeof_item);
+ assert(buf != nullptr);
+ d_buffer.reset(buf);
+
+ d_base = d_buffer.get();
+ d_bufsize = nitems;
+
+ d_downstream_lcm_nitems = downstream_lcm_nitems;
+ d_write_multiple = write_granularity;
+
+ return true;
+}
+
+bool buffer_single_mapped::output_blkd_cb_ready(int output_multiple)
+{
+ uint32_t space_avail = 0;
+ {
+ gr::thread::scoped_lock(*this->mutex());
+ space_avail = space_available();
+ }
+ return ((space_avail > 0) &&
+ ((space_avail / output_multiple) * output_multiple == 0));
+}
+
+
+bool buffer_single_mapped::output_blocked_callback(int output_multiple, bool force)
+{
+ uint32_t space_avail = space_available();
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "output_blocked_callback()*** WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written()
+ << " -- output_multiple: " << output_multiple
+ << " -- space_avail: " << space_avail << " -- force: " << force;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ if (((space_avail > 0) && ((space_avail / output_multiple) * output_multiple == 0)) ||
+ force) {
+ // Find reader with the smallest read index
+ uint32_t min_read_idx = d_readers[0]->d_read_index;
+ for (size_t idx = 1; idx < d_readers.size(); ++idx) {
+ // Record index of reader with minimum read-index
+ if (d_readers[idx]->d_read_index < min_read_idx) {
+ min_read_idx = d_readers[idx]->d_read_index;
+ }
+ }
+
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "output_blocked_callback() WR_idx: " << d_write_index
+ << " -- WR items: " << nitems_written() << " -- min RD_idx: " << min_read_idx
+ << " -- shortcircuit: "
+ << ((min_read_idx == 0) || (min_read_idx >= d_write_index))
+ << " -- to_move_items: " << (d_write_index - min_read_idx)
+ << " -- space_avail: " << space_avail << " -- force: " << force;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ // Make sure we have enough room to start writing back at the beginning
+ if ((min_read_idx == 0) || (min_read_idx >= d_write_index)) {
+ return false;
+ }
+
+ // Determine how much "to be read" data needs to be moved
+ int to_move_items = d_write_index - min_read_idx;
+ assert(to_move_items > 0);
+ uint32_t to_move_bytes = to_move_items * d_sizeof_item;
+
+ // Shift "to be read" data back to the beginning of the buffer
+ std::memmove(d_base, d_base + (min_read_idx * d_sizeof_item), to_move_bytes);
+
+ // Adjust write index and each reader index
+ d_write_index -= min_read_idx;
+
+ for (size_t idx = 0; idx < d_readers.size(); ++idx) {
+ d_readers[idx]->d_read_index -= min_read_idx;
+ }
+
+ return true;
+ }
+
+ return false;
+}
+
+int buffer_single_mapped::space_available()
+{
+ if (d_readers.empty())
+ return d_bufsize;
+
+ else {
+
+ size_t min_items_read_idx = 0;
+ uint64_t min_items_read = d_readers[0]->nitems_read();
+ for (size_t idx = 1; idx < d_readers.size(); ++idx) {
+ // Record index of reader with minimum nitems read
+ if (d_readers[idx]->nitems_read() <
+ d_readers[min_items_read_idx]->nitems_read()) {
+ min_items_read_idx = idx;
+ }
+ min_items_read = std::min(min_items_read, d_readers[idx]->nitems_read());
+ }
+
+ buffer_reader* min_idx_reader = d_readers[min_items_read_idx];
+ unsigned min_read_index = d_readers[min_items_read_idx]->d_read_index;
+
+ // For single mapped buffer there is no wrapping beyond the end of the
+ // buffer
+#ifdef BUFFER_DEBUG
+ int thecase = 4; // REMOVE ME - just for debug
+#endif
+ int space = d_bufsize - d_write_index;
+
+ if (min_read_index == d_write_index) {
+#ifdef BUFFER_DEBUG
+ thecase = 1;
+#endif
+
+ // If the (min) read index and write index are equal then the buffer
+ // is either completely empty or completely full depending on if
+ // the number of items read matches the number written
+ size_t offset = ((min_idx_reader->link()->history() - 1) +
+ min_idx_reader->sample_delay());
+ if ((min_idx_reader->nitems_read() - offset) != nitems_written()) {
+#ifdef BUFFER_DEBUG
+ thecase = 2;
+#endif
+ space = 0;
+ }
+ } else if (min_read_index > d_write_index) {
+#ifdef BUFFER_DEBUG
+ thecase = 3;
+#endif
+ space = min_read_index - d_write_index;
+ // Leave extra space in case the reader gets stuck and needs realignment
+ {
+ if ((d_write_index > (d_bufsize / 2)) ||
+ (min_read_index < (d_bufsize / 2))) {
+#ifdef BUFFER_DEBUG
+ thecase = 17;
+#endif
+ space = 0;
+ } else {
+ space = (d_bufsize / 2) - d_write_index;
+ }
+ }
+ }
+
+ if (min_items_read != d_last_min_items_read) {
+ prune_tags(d_last_min_items_read);
+ d_last_min_items_read = min_items_read;
+ }
+
+#ifdef BUFFER_DEBUG
+ // BUFFER DEBUG
+ std::ostringstream msg;
+ msg << "[" << this << "] "
+ << "space_available() called (case: " << thecase
+ << ") d_write_index: " << d_write_index << " (" << nitems_written() << ") "
+ << " -- min_read_index: " << min_read_index << " ("
+ << min_idx_reader->nitems_read() << ") "
+ << " -- space: " << space
+ << " (sample delay: " << min_idx_reader->sample_delay() << ")";
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+
+ return space;
+ }
+}
+
+} /* namespace gr */
diff --git a/gnuradio-runtime/lib/buffer_type.cc b/gnuradio-runtime/lib/buffer_type.cc
new file mode 100644
index 0000000000..b667eded1e
--- /dev/null
+++ b/gnuradio-runtime/lib/buffer_type.cc
@@ -0,0 +1,18 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+#include <gnuradio/buffer_type.h>
+
+namespace gr {
+
+uint32_t buffer_type_base::s_nextId = 0;
+std::mutex buffer_type_base::s_mutex;
+
+
+} /* namespace gr */ \ No newline at end of file
diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc
index 0df75553e0..b9986f8370 100644
--- a/gnuradio-runtime/lib/flat_flowgraph.cc
+++ b/gnuradio-runtime/lib/flat_flowgraph.cc
@@ -15,6 +15,9 @@
#include "flat_flowgraph.h"
#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/buffer_type.h>
+#include <gnuradio/integer_math.h>
#include <gnuradio/logger.h>
#include <gnuradio/prefs.h>
#include <volk/volk.h>
@@ -24,6 +27,7 @@
namespace gr {
+
// 32Kbyte buffer size between blocks
#define GR_FIXED_BUFFER_SIZE (32 * (1L << 10))
@@ -44,8 +48,9 @@ void flat_flowgraph::setup_connections()
basic_block_vector_t blocks = calc_used_blocks();
// Assign block details to blocks
- for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
- cast_to_block_sptr(*p)->set_detail(allocate_block_detail(*p));
+ for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
+ allocate_block_detail(*p);
+ }
// Connect inputs to outputs for each block
for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
@@ -56,7 +61,7 @@ void flat_flowgraph::setup_connections()
block->set_is_unaligned(false);
}
- // Connect message ports connetions
+ // Connect message ports connections
for (msg_edge_viter_t i = d_msg_edges.begin(); i != d_msg_edges.end(); i++) {
GR_LOG_DEBUG(
d_debug_logger,
@@ -67,11 +72,10 @@ void flat_flowgraph::setup_connections()
}
}
-block_detail_sptr flat_flowgraph::allocate_block_detail(basic_block_sptr block)
+void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
{
int ninputs = calc_used_ports(block, true).size();
int noutputs = calc_used_ports(block, false).size();
- block_detail_sptr detail = make_block_detail(ninputs, noutputs);
block_sptr grblock = cast_to_block_sptr(block);
if (!grblock)
@@ -80,98 +84,88 @@ block_detail_sptr flat_flowgraph::allocate_block_detail(basic_block_sptr block)
block->alias())
.str());
- GR_LOG_DEBUG(d_debug_logger, "Creating block detail for " + block->identifier());
+ // Determine the downstream max per output port
+ std::vector<int> downstream_max_nitems(noutputs, 0);
+ std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1);
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "BLOCK: " << block->identifier();
+ GR_LOG_DEBUG(d_logger, msg.str()); // could also be d_debug_logger
+#endif
for (int i = 0; i < noutputs; i++) {
- grblock->expand_minmax_buffer(i);
+ int nitems = 0;
+ uint64_t lcm_nitems = 1;
+ basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i);
+ for (basic_block_viter_t blk = downstream_blocks.begin();
+ blk != downstream_blocks.end();
+ blk++) {
+ block_sptr dgrblock = cast_to_block_sptr(*blk);
+ if (!dgrblock)
+ throw std::runtime_error("allocate_buffer found non-gr::block");
+
+#ifdef BUFFER_DEBUG
+ msg.str("");
+ msg << " DWNSTRM BLOCK: " << dgrblock->identifier();
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
- buffer_sptr buffer = allocate_buffer(block, i);
- GR_LOG_DEBUG(d_debug_logger,
- "Allocated buffer for output " + block->identifier() + " " +
- std::to_string(i));
- detail->set_output(i, buffer);
-
- // Update the block's max_output_buffer based on what was actually allocated.
- if ((grblock->max_output_buffer(i) != buffer->bufsize()) &&
- (grblock->max_output_buffer(i) != -1))
- GR_LOG_WARN(d_logger,
- boost::format("Block (%1%) max output buffer set to %2%"
- " instead of requested %3%") %
- grblock->alias() % buffer->bufsize() %
- grblock->max_output_buffer(i));
- grblock->set_max_output_buffer(i, buffer->bufsize());
- }
+ // If any downstream blocks are decimators and/or have a large
+ // output_multiple, ensure we have a buffer at least twice their
+ // decimation factor*output_multiple
+ double decimation = (1.0 / dgrblock->relative_rate());
+ int multiple = dgrblock->output_multiple();
+ int history = dgrblock->history();
+ nitems =
+ std::max(nitems, static_cast<int>(2 * (decimation * multiple + history)));
+
+ // Calculate the LCM of downstream reader nitems
+#ifdef BUFFER_DEBUG
+ msg.str("");
+ msg << " OUT MULTIPLE: " << multiple;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
- return detail;
-}
+ if (dgrblock->fixed_rate()) {
+ lcm_nitems = GR_LCM(lcm_nitems,
+ (uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) -
+ (dgrblock->history() - 1)));
+ }
+ if (dgrblock->relative_rate() != 1.0) {
+ // Relative rate
+ lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d());
+ }
-buffer_sptr flat_flowgraph::allocate_buffer(basic_block_sptr block, int port)
-{
- block_sptr grblock = cast_to_block_sptr(block);
- if (!grblock)
- throw std::runtime_error("allocate_buffer found non-gr::block");
- int item_size = block->output_signature()->sizeof_stream_item(port);
-
- // *2 because we're now only filling them 1/2 way in order to
- // increase the available parallelism when using the TPB scheduler.
- // (We're double buffering, where we used to single buffer)
- int nitems = s_fixed_buffer_size * 2 / item_size;
-
- // Make sure there are at least twice the output_multiple no. of items
- if (nitems < 2 * grblock->output_multiple()) // Note: this means output_multiple()
- nitems = 2 * grblock->output_multiple(); // can't be changed by block dynamically
-
- // If any downstream blocks are decimators and/or have a large output_multiple,
- // ensure we have a buffer at least twice their decimation factor*output_multiple
- basic_block_vector_t blocks = calc_downstream_blocks(block, port);
-
- // limit buffer size if indicated
- if (grblock->max_output_buffer(port) > 0) {
- // GR_LOG_INFO(d_debug_logger, boost::format("constraining output items to %d")
- // % block->max_output_buffer(port));
- nitems = std::min((long)nitems, (long)grblock->max_output_buffer(port));
- nitems -= nitems % grblock->output_multiple();
- if (nitems < 1)
- throw std::runtime_error("problems allocating a buffer with the given max "
- "output buffer constraint!");
- } else if (grblock->min_output_buffer(port) > 0) {
- nitems = std::max((long)nitems, (long)grblock->min_output_buffer(port));
- nitems -= nitems % grblock->output_multiple();
- if (nitems < 1)
- throw std::runtime_error("problems allocating a buffer with the given min "
- "output buffer constraint!");
- }
+ // Sanity check, make sure lcm_nitems is at least 1
+ if (lcm_nitems < 1) {
+ lcm_nitems = 1;
+ }
- for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
- block_sptr dgrblock = cast_to_block_sptr(*p);
- if (!dgrblock)
- throw std::runtime_error("allocate_buffer found non-gr::block");
-
- double decimation = (1.0 / dgrblock->relative_rate());
- int multiple = dgrblock->output_multiple();
- int history = dgrblock->history();
- nitems =
- std::max(nitems, static_cast<int>(2 * (decimation * multiple + history)));
- }
+#ifdef BUFFER_DEBUG
+ msg.str("");
+ msg << " NINPUT_ITEMS: " << nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
- // std::cout << "make_buffer(" << nitems << ", " << item_size << ", " << grblock <<
- // "\n";
- // We're going to let this fail once and retry. If that fails,
- // throw and exit.
- buffer_sptr b;
- try {
- b = make_buffer(nitems, item_size, grblock);
- } catch (std::bad_alloc&) {
- b = make_buffer(nitems, item_size, grblock);
- }
+ msg.str("");
+ msg << " LCM NITEMS: " << lcm_nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
- // Set the max noutput items size here to make sure it's always
- // set in the block and available in the start() method.
- // But don't overwrite if the user has set this externally.
- if (!grblock->is_set_max_noutput_items())
- grblock->set_max_noutput_items(nitems);
+ msg.str("");
+ msg << " HISTORY: " << dgrblock->history();
+ GR_LOG_DEBUG(d_logger, msg.str());
- return b;
+ msg.str("");
+ msg << " DELAY: " << dgrblock->sample_delay(0);
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+ }
+ downstream_max_nitems[i] = nitems;
+ downstream_lcm_nitems[i] = lcm_nitems;
+ }
+
+ // Allocate the block detail and necessary buffers
+ grblock->allocate_detail(
+ ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems);
}
void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
@@ -194,8 +188,40 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
block_sptr src_grblock = cast_to_block_sptr(src_block);
if (!src_grblock)
throw std::runtime_error("connect_block_inputs found non-gr::block");
- buffer_sptr src_buffer = src_grblock->detail()->output(src_port);
+ buffer_sptr src_buffer;
+ buffer_type_t src_buf_type = src_grblock->get_buffer_type();
+ buffer_type_t dest_buf_type = grblock->get_buffer_type();
+ if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() ||
+ dest_buf_type == src_buf_type) {
+ // The block is not using a custom buffer OR the block and the upstream
+ // block both use the same kind of custom buffer
+ src_buffer = src_grblock->detail()->output(src_port);
+ } else {
+ if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() &&
+ src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) {
+ // The block uses a custom buffer but the upstream block does not
+ // therefore the upstream block's buffer can be replaced with the
+ // type of buffer that the block needs
+ std::ostringstream msg;
+ msg << "Block: " << grblock->identifier()
+ << "replacing upstream block: " << src_grblock->identifier()
+ << " buffer with a custom buffer";
+ GR_LOG_DEBUG(d_debug_logger, msg.str());
+ src_buffer = src_grblock->replace_buffer(src_port, grblock);
+ } else {
+ // Both the block and upstream block use incompatible buffer types
+ // which is not currently allowed
+ std::ostringstream msg;
+ msg << "Block: " << grblock->identifier()
+ << " and upstream block: " << src_grblock->identifier()
+ << " use incompatible custom buffer types (" << dest_buf_type.name()
+ << " -- " << src_buf_type.name() << ") --> "
+ << (dest_buf_type == src_buf_type);
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+ }
GR_LOG_DEBUG(d_debug_logger,
"Setting input " + std::to_string(dst_port) + " from edge " +
@@ -220,7 +246,7 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg)
if (!block->detail()) {
GR_LOG_DEBUG(d_debug_logger,
"merge: allocating new detail for block " + block->identifier());
- block->set_detail(allocate_block_detail(block));
+ allocate_block_detail(block);
} else {
GR_LOG_DEBUG(d_debug_logger,
"merge: reusing original detail for block " +
diff --git a/gnuradio-runtime/lib/flat_flowgraph.h b/gnuradio-runtime/lib/flat_flowgraph.h
index deb0fe2c6a..71645d6a25 100644
--- a/gnuradio-runtime/lib/flat_flowgraph.h
+++ b/gnuradio-runtime/lib/flat_flowgraph.h
@@ -78,8 +78,7 @@ public:
private:
flat_flowgraph();
- block_detail_sptr allocate_block_detail(basic_block_sptr block);
- buffer_sptr allocate_buffer(basic_block_sptr block, int port);
+ void allocate_block_detail(basic_block_sptr block);
void connect_block_inputs(basic_block_sptr block);
/* When reusing a flowgraph's blocks, this call makes sure all of
diff --git a/gnuradio-runtime/lib/qa_buffer.cc b/gnuradio-runtime/lib/qa_buffer.cc
index c0e9c0d130..cefe548338 100644
--- a/gnuradio-runtime/lib/qa_buffer.cc
+++ b/gnuradio-runtime/lib/qa_buffer.cc
@@ -14,6 +14,8 @@
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_double_mapped.h>
+#include <gnuradio/buffer_reader.h>
#include <gnuradio/random.h>
#include <boost/test/unit_test.hpp>
#include <cstdlib>
@@ -40,7 +42,8 @@ static void t0_body()
int nitems = 4000 / sizeof(int);
int counter = 0;
- gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr()));
+ gr::buffer_sptr buf(
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
int last_sa;
int sa;
@@ -74,7 +77,8 @@ static void t1_body()
int write_counter = 0;
int read_counter = 0;
- gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr()));
+ gr::buffer_sptr buf(
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr()));
int sa;
@@ -145,7 +149,8 @@ static void t2_body()
int nitems = (64 * (1L << 10)) / sizeof(int); // 64K worth of ints
- gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr()));
+ gr::buffer_sptr buf(
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
gr::buffer_reader_sptr r1(gr::buffer_add_reader(buf, 0, gr::block_sptr()));
int read_counter = 0;
@@ -210,7 +215,8 @@ static void t3_body()
int nitems = (64 * (1L << 10)) / sizeof(int);
static const int N = 5;
- gr::buffer_sptr buf(gr::make_buffer(nitems, sizeof(int), gr::block_sptr()));
+ gr::buffer_sptr buf(
+ gr::make_buffer_double_mapped(nitems, sizeof(int), nitems, gr::block_sptr()));
gr::buffer_reader_sptr reader[N];
int read_counter[N];
int write_counter = 0;
diff --git a/gnuradio-runtime/lib/tpb_detail.cc b/gnuradio-runtime/lib/tpb_detail.cc
index 312a5bf5e2..badc4cf9ef 100644
--- a/gnuradio-runtime/lib/tpb_detail.cc
+++ b/gnuradio-runtime/lib/tpb_detail.cc
@@ -15,6 +15,7 @@
#include <gnuradio/block.h>
#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
#include <gnuradio/tpb_detail.h>
namespace gr {
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt b/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt
index 14eb501ddb..dbc74b0529 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/CMakeLists.txt
@@ -17,6 +17,7 @@ messages/msg_queue_python.cc
block_gateway_python.cc
# block_registry_python.cc
buffer_python.cc
+ buffer_reader_python.cc
constants_python.cc
endianness_python.cc
expj_python.cc
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc
index f18fce2c9a..933d651fa7 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/basic_block_python.cc
@@ -13,8 +13,8 @@
/* If manual edits are made, the following tags should be modified accordingly. */
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
-/* BINDTOOL_HEADER_FILE(basic_block.h) */
-/* BINDTOOL_HEADER_FILE_HASH(9239cc3381582f5f44010485cd48fa72) */
+/* BINDTOOL_HEADER_FILE(basic_block.h) */
+/* BINDTOOL_HEADER_FILE_HASH(5c1d5b8a3666a2e0e7a6fafae07afa29) */
/***********************************************************************************/
#include <pybind11/complex.h>
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc
index 219c95b153..979c19f2f2 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/block_detail_python.cc
@@ -13,8 +13,8 @@
/* If manual edits are made, the following tags should be modified accordingly. */
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
-/* BINDTOOL_HEADER_FILE(block_detail.h) */
-/* BINDTOOL_HEADER_FILE_HASH(274b4f673ac76cedd3ef6d466afab2fc) */
+/* BINDTOOL_HEADER_FILE(block_detail.h) */
+/* BINDTOOL_HEADER_FILE_HASH(61794baf0e727516eb0eedc08753a17a) */
/***********************************************************************************/
#include <pybind11/complex.h>
@@ -25,6 +25,7 @@ namespace py = pybind11;
#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
// pydoc.h is automatically generated in the build directory
#include <block_detail_pydoc.h>
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc
index 78c5b03cfe..9660a7f245 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/block_python.cc
@@ -14,7 +14,7 @@
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
/* BINDTOOL_HEADER_FILE(block.h) */
-/* BINDTOOL_HEADER_FILE_HASH(5df7331a717fb7c436eac2fc20991858) */
+/* BINDTOOL_HEADER_FILE_HASH(238d129ad018daa3146ff1d8867dc356) */
/***********************************************************************************/
#include <pybind11/complex.h>
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc
index 89845d2e28..de7d4edf1c 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_python.cc
@@ -13,8 +13,8 @@
/* If manual edits are made, the following tags should be modified accordingly. */
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
-/* BINDTOOL_HEADER_FILE(buffer.h) */
-/* BINDTOOL_HEADER_FILE_HASH(69b17e66fac6e29f860466846a64feab) */
+/* BINDTOOL_HEADER_FILE(buffer.h) */
+/* BINDTOOL_HEADER_FILE_HASH(e5247f4fe5b5873c66eed72880194981) */
/***********************************************************************************/
#include <pybind11/complex.h>
@@ -25,6 +25,7 @@ namespace py = pybind11;
#include <gnuradio/block.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
// pydoc.h is automatically generated in the build directory
#include <buffer_pydoc.h>
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc
new file mode 100644
index 0000000000..23e2f39d10
--- /dev/null
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/buffer_reader_python.cc
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+/***********************************************************************************/
+/* This file is automatically generated using bindtool and can be manually edited */
+/* The following lines can be configured to regenerate this file during cmake */
+/* If manual edits are made, the following tags should be modified accordingly. */
+/* BINDTOOL_GEN_AUTOMATIC(0) */
+/* BINDTOOL_USE_PYGCCXML(0) */
+/* BINDTOOL_HEADER_FILE(buffer_reader.h) */
+/* BINDTOOL_HEADER_FILE_HASH(451fcbd61f40b7d17a151474869aad75) */
+/***********************************************************************************/
+
+#include <pybind11/complex.h>
+#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+
+namespace py = pybind11;
+
+#include <gnuradio/block.h>
+#include <gnuradio/buffer_reader.h>
+// pydoc.h is automatically generated in the build directory
+#include <buffer_pydoc.h>
+
+void bind_buffer_reader(py::module& m)
+{
+
+ using buffer_reader = ::gr::buffer_reader;
+
+ py::class_<buffer_reader, std::shared_ptr<buffer_reader>>(
+ m, "buffer_reader", D(buffer_reader))
+
+ .def(py::init<gr::buffer_reader const&>(),
+ py::arg("arg0"),
+ D(buffer_reader, buffer_reader))
+
+
+ .def("declare_sample_delay",
+ &buffer_reader::declare_sample_delay,
+ py::arg("delay"),
+ D(buffer_reader, declare_sample_delay))
+
+
+ .def("sample_delay", &buffer_reader::sample_delay, D(buffer_reader, sample_delay))
+
+
+ .def("items_available",
+ &buffer_reader::items_available,
+ D(buffer_reader, items_available))
+
+
+ .def("buffer", &buffer_reader::buffer, D(buffer_reader, buffer))
+
+
+ .def("max_possible_items_available",
+ &buffer_reader::max_possible_items_available,
+ D(buffer_reader, max_possible_items_available))
+
+
+ .def("read_pointer", &buffer_reader::read_pointer, D(buffer_reader, read_pointer))
+
+
+ .def("update_read_pointer",
+ &buffer_reader::update_read_pointer,
+ py::arg("nitems"),
+ D(buffer_reader, update_read_pointer))
+
+
+ .def("set_done",
+ &buffer_reader::set_done,
+ py::arg("done"),
+ D(buffer_reader, set_done))
+
+
+ .def("done", &buffer_reader::done, D(buffer_reader, done))
+
+
+ .def("mutex", &buffer_reader::mutex, D(buffer_reader, mutex))
+
+
+ .def("nitems_read", &buffer_reader::nitems_read, D(buffer_reader, nitems_read))
+
+
+ .def("reset_nitem_counter",
+ &buffer_reader::reset_nitem_counter,
+ D(buffer_reader, reset_nitem_counter))
+
+
+ .def("get_sizeof_item",
+ &buffer_reader::get_sizeof_item,
+ D(buffer_reader, get_sizeof_item))
+
+
+ .def("link", &buffer_reader::link, D(buffer_reader, link))
+
+
+ .def("get_tags_in_range",
+ &buffer_reader::get_tags_in_range,
+ py::arg("v"),
+ py::arg("abs_start"),
+ py::arg("abs_end"),
+ py::arg("id"),
+ D(buffer_reader, get_tags_in_range))
+
+ ;
+
+
+ m.def("buffer_add_reader",
+ &::gr::buffer_add_reader,
+ py::arg("buf"),
+ py::arg("nzero_preload"),
+ py::arg("link") = gr::block_sptr(),
+ py::arg("delay") = 0,
+ D(buffer_add_reader));
+
+
+ m.def("buffer_reader_ncurrently_allocated",
+ &::gr::buffer_reader_ncurrently_allocated,
+ D(buffer_reader_ncurrently_allocated));
+}
diff --git a/gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc b/gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc
index 8ec43201fb..cccc60fe30 100644
--- a/gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc
+++ b/gnuradio-runtime/python/gnuradio/gr/bindings/logger_python.cc
@@ -13,8 +13,8 @@
/* If manual edits are made, the following tags should be modified accordingly. */
/* BINDTOOL_GEN_AUTOMATIC(0) */
/* BINDTOOL_USE_PYGCCXML(0) */
-/* BINDTOOL_HEADER_FILE(logger.h) */
-/* BINDTOOL_HEADER_FILE_HASH(eaf28bcaeb7c34dc0fca3fdd4a9860c0) */
+/* BINDTOOL_HEADER_FILE(logger.h) */
+/* BINDTOOL_HEADER_FILE_HASH(6d4f118d476b888f79737f835ed27a4d) */
/***********************************************************************************/
#include <pybind11/complex.h>
diff --git a/gnuradio-runtime/python/gnuradio/gr_unittest.py b/gnuradio-runtime/python/gnuradio/gr_unittest.py
index 6177f0f32f..ebd47019a5 100644
--- a/gnuradio-runtime/python/gnuradio/gr_unittest.py
+++ b/gnuradio-runtime/python/gnuradio/gr_unittest.py
@@ -110,6 +110,25 @@ class TestCase(unittest.TestCase):
self.assertComplexAlmostEqual2(x, y, abs_eps, rel_eps, msg)
for (x, y) in zip(a, b)
])
+
+
+ def assertSequenceEqualGR(self, data_in, data_out):
+ """
+ Note this function exists because of this bug: https://bugs.python.org/issue19217
+ Calling self.assertEqual(seqA, seqB) can hang if seqA and seqB are not equal.
+ """
+ if len(data_in) != len(data_out):
+ print('Lengths do not match: {:d} -- {:d}'.format(len(data_in), len(data_out)))
+ self.assertTrue(len(data_in) == len(data_out))
+ total_miscompares = 0
+ for idx, item in enumerate(zip(data_in, data_out)):
+ if item[0] != item[1]:
+ total_miscompares += 1
+ print('Miscompare at: {:d} ({:d} -- {:d})'.format(idx, item[0], item[1]))
+ if total_miscompares > 0:
+ print('Total miscompares: {:d}'.format(total_miscompares))
+ self.assertTrue(total_miscompares == 0)
+
def waitFor(
self,
diff --git a/gr-audio/lib/portaudio/portaudio_sink.cc b/gr-audio/lib/portaudio/portaudio_sink.cc
index a197776adc..ea27c7c0f9 100644
--- a/gr-audio/lib/portaudio/portaudio_sink.cc
+++ b/gr-audio/lib/portaudio/portaudio_sink.cc
@@ -65,7 +65,8 @@ void portaudio_sink::create_ringbuffer(void)
(N_BUFFERS * bufsize_samples / d_output_parameters.channelCount));
// FYI, the buffer indices are in units of samples.
- d_writer = gr::make_buffer(N_BUFFERS * bufsize_samples, sizeof(sample_t));
+ d_writer = gr::make_buffer(
+ N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples);
d_reader = gr::buffer_add_reader(d_writer, 0);
}
diff --git a/gr-audio/lib/portaudio/portaudio_sink.h b/gr-audio/lib/portaudio/portaudio_sink.h
index 2e25e2f740..ae28705647 100644
--- a/gr-audio/lib/portaudio/portaudio_sink.h
+++ b/gr-audio/lib/portaudio/portaudio_sink.h
@@ -12,6 +12,7 @@
#include <gnuradio/audio/sink.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
#include <gnuradio/logger.h>
#include <gnuradio/thread/thread.h>
#include <portaudio.h>
diff --git a/gr-audio/lib/portaudio/portaudio_source.cc b/gr-audio/lib/portaudio/portaudio_source.cc
index 86a7f16b48..63a3d5e1ba 100644
--- a/gr-audio/lib/portaudio/portaudio_source.cc
+++ b/gr-audio/lib/portaudio/portaudio_source.cc
@@ -64,7 +64,8 @@ void portaudio_source::create_ringbuffer(void)
(N_BUFFERS * bufsize_samples / d_input_parameters.channelCount));
// FYI, the buffer indices are in units of samples.
- d_writer = gr::make_buffer(N_BUFFERS * bufsize_samples, sizeof(sample_t));
+ d_writer = gr::make_buffer(
+ N_BUFFERS * bufsize_samples, sizeof(sample_t), N_BUFFERS * bufsize_samples);
d_reader = gr::buffer_add_reader(d_writer, 0);
}
diff --git a/gr-audio/lib/portaudio/portaudio_source.h b/gr-audio/lib/portaudio/portaudio_source.h
index ecaf5a1c5b..baab6bee42 100644
--- a/gr-audio/lib/portaudio/portaudio_source.h
+++ b/gr-audio/lib/portaudio/portaudio_source.h
@@ -13,6 +13,7 @@
#include <gnuradio/audio/source.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
#include <gnuradio/logger.h>
#include <gnuradio/thread/thread.h>
#include <portaudio.h>
diff --git a/gr-blocks/lib/qa_gr_block.cc b/gr-blocks/lib/qa_gr_block.cc
index ff88902e98..767a252ddb 100644
--- a/gr-blocks/lib/qa_gr_block.cc
+++ b/gr-blocks/lib/qa_gr_block.cc
@@ -15,6 +15,7 @@
#include <gnuradio/block.h>
#include <gnuradio/blocks/null_sink.h>
#include <gnuradio/blocks/null_source.h>
+#include <gnuradio/buffer_reader.h>
#include <boost/test/unit_test.hpp>
BOOST_AUTO_TEST_CASE(t0)
diff --git a/gr-blocks/lib/qa_gr_top_block.cc b/gr-blocks/lib/qa_gr_top_block.cc
index 6a9e0f5467..5374a67220 100644
--- a/gr-blocks/lib/qa_gr_top_block.cc
+++ b/gr-blocks/lib/qa_gr_top_block.cc
@@ -20,7 +20,7 @@
#include <boost/test/unit_test.hpp>
#include <iostream>
-#define VERBOSE 0
+#define VERBOSE 1
BOOST_AUTO_TEST_CASE(t0)
{
@@ -250,6 +250,7 @@ BOOST_AUTO_TEST_CASE(t10_reconfig_max_output_buffer)
// Reconfigure with gr_head in the middle
tb->lock();
+
gr::block_sptr nop = gr::blocks::nop::make(sizeof(int));
nop->set_max_output_buffer(4000);
tb->disconnect(src, 0, dst, 0);
diff --git a/gr-fec/python/fec/_qa_helper.py b/gr-fec/python/fec/_qa_helper.py
index 8a1b37ec95..00a6015684 100644
--- a/gr-fec/python/fec/_qa_helper.py
+++ b/gr-fec/python/fec/_qa_helper.py
@@ -46,8 +46,8 @@ class _qa_helper(gr.top_block):
self.threading = threading
self.ext_encoder = extended_encoder(enc, threading=self.threading, puncpat=self.puncpat)
- self.ext_decoder= extended_decoder(dec, threading=self.threading, ann=None,
- puncpat=self.puncpat, integration_period=10000)
+ self.ext_decoder = extended_decoder(dec, threading=self.threading, ann=None,
+ puncpat=self.puncpat, integration_period=10000)
self.src = blocks.vector_source_b(data_size*[0, 1, 2, 3, 5, 7, 9, 13, 15, 25, 31, 45, 63, 95, 127], False)
self.unpack = blocks.unpack_k_bits_bb(8)
diff --git a/gr-fec/python/fec/extended_encoder.py b/gr-fec/python/fec/extended_encoder.py
index 70422da01e..fbcbeb1be1 100644
--- a/gr-fec/python/fec/extended_encoder.py
+++ b/gr-fec/python/fec/extended_encoder.py
@@ -26,8 +26,8 @@ class extended_encoder(gr.hier_block2):
self.blocks=[]
self.puncpat=puncpat
- if(type(encoder_obj_list) == list):
- if(type(encoder_obj_list[0]) == list):
+ if (type(encoder_obj_list) == list):
+ if (type(encoder_obj_list[0]) == list):
gr.log.info("fec.extended_encoder: Parallelism must be 1.")
raise AttributeError
else:
diff --git a/gr-fec/python/fec/qa_depuncture.py b/gr-fec/python/fec/qa_depuncture.py
index 0fdd795c0c..763a40f242 100644
--- a/gr-fec/python/fec/qa_depuncture.py
+++ b/gr-fec/python/fec/qa_depuncture.py
@@ -65,7 +65,7 @@ class test_depuncture (gr_unittest.TestCase):
for i in range(len(dst_data)):
dst_data[i] = int(dst_data[i])
- self.assertEqual(self.expected, dst_data)
+ self.assertSequenceEqualGR(self.expected, dst_data)
def test_001(self):
# Test normal operation of the depuncture block with a delay
@@ -90,7 +90,7 @@ class test_depuncture (gr_unittest.TestCase):
for i in range(len(dst_data)):
dst_data[i] = int(dst_data[i])
- self.assertEqual(self.expected, dst_data)
+ self.assertSequenceEqualGR(self.expected, dst_data)
def test_002(self):
# Test scenario where we have defined a puncture pattern with
@@ -116,7 +116,7 @@ class test_depuncture (gr_unittest.TestCase):
for i in range(len(dst_data)):
dst_data[i] = int(dst_data[i])
- self.assertEqual(self.expected, dst_data)
+ self.assertSequenceEqualGR(self.expected, dst_data)
def test_003(self):
# Test scenario where we have defined a puncture pattern with
@@ -150,7 +150,7 @@ class test_depuncture (gr_unittest.TestCase):
for i in range(len(dst_data1)):
dst_data1[i] = int(dst_data1[i])
- self.assertEqual(dst_data1, dst_data0)
+ self.assertSequenceEqualGR(dst_data1, dst_data0)
def test_004(self):
# Test normal operation of the depuncture block without
@@ -176,7 +176,7 @@ class test_depuncture (gr_unittest.TestCase):
for i in range(len(dst_data)):
dst_data[i] = int(dst_data[i])
- self.assertEqual(self.expected, dst_data)
+ self.assertSequenceEqualGR(self.expected, dst_data)
if __name__ == '__main__':
diff --git a/gr-fec/python/fec/qa_fecapi_dummy.py b/gr-fec/python/fec/qa_fecapi_dummy.py
index f1de85e526..fa1abdf789 100644
--- a/gr-fec/python/fec/qa_fecapi_dummy.py
+++ b/gr-fec/python/fec/qa_fecapi_dummy.py
@@ -26,7 +26,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
def tearDown(self):
self.tb = None
-
+
def test_parallelism0_00(self):
frame_size = 30
enc = fec.dummy_encoder_make(frame_size * 8)
@@ -39,7 +39,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism0_01(self):
frame_size = 30
@@ -52,8 +52,8 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
-
- self.assertEqual(data_in, data_out)
+
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism0_02(self):
frame_size = 30
@@ -66,8 +66,8 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
-
- self.assertEqual(data_in, data_out)
+
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_00(self):
frame_size = 30
@@ -83,7 +83,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_01(self):
frame_size = 30
@@ -99,7 +99,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_02(self):
frame_size = 300
@@ -111,11 +111,10 @@ class test_fecapi_dummy(gr_unittest.TestCase):
self.test = _qa_helper(10 * frame_size, enc, dec, threading)
self.tb.connect(self.test)
self.tb.run()
-
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
-
- self.assertEqual(data_in, data_out)
+
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_03(self):
frame_size = 30
@@ -132,7 +131,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_04(self):
frame_size = 30
@@ -149,7 +148,7 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_05(self):
frame_size = 30
@@ -257,10 +256,10 @@ class test_fecapi_dummy(gr_unittest.TestCase):
data = list(data)
packed_data = list(packed_data)
- self.assertListEqual(packed_data, r0)
- self.assertListEqual(data, r1)
- self.assertListEqual(packed_data, r2)
- self.assertListEqual(data, r3)
+ self.assertSequenceEqualGR(packed_data, r0)
+ self.assertSequenceEqualGR(data, r1)
+ self.assertSequenceEqualGR(packed_data, r2)
+ self.assertSequenceEqualGR(data, r3)
if __name__ == '__main__':
diff --git a/gr-fec/python/fec/qa_fecapi_repetition.py b/gr-fec/python/fec/qa_fecapi_repetition.py
index 9d560b32ef..ec55e2d83d 100644
--- a/gr-fec/python/fec/qa_fecapi_repetition.py
+++ b/gr-fec/python/fec/qa_fecapi_repetition.py
@@ -36,7 +36,7 @@ class test_fecapi_repetition(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism0_01(self):
frame_size = 30
@@ -51,7 +51,7 @@ class test_fecapi_repetition(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism0_02(self):
frame_size = 30
@@ -66,7 +66,7 @@ class test_fecapi_repetition(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_00(self):
frame_size = 30
@@ -83,7 +83,7 @@ class test_fecapi_repetition(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_01(self):
frame_size = 30
@@ -100,7 +100,7 @@ class test_fecapi_repetition(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_02(self):
frame_size = 300
@@ -117,7 +117,7 @@ class test_fecapi_repetition(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_03(self):
frame_size = 30
@@ -135,7 +135,7 @@ class test_fecapi_repetition(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
def test_parallelism1_04(self):
frame_size = 30
@@ -153,7 +153,7 @@ class test_fecapi_repetition(gr_unittest.TestCase):
data_in = self.test.snk_input.data()
data_out = self.test.snk_output.data()
- self.assertEqual(data_in, data_out)
+ self.assertSequenceEqualGR(data_in, data_out)
if __name__ == '__main__':