summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/flat_flowgraph.cc
diff options
context:
space:
mode:
authorDavid Sorber <david.sorber@blacklynx.tech>2021-05-12 08:59:21 -0400
committermormj <34754695+mormj@users.noreply.github.com>2021-10-25 11:27:01 -0400
commit788827ae116bef871e144abd39b1e4482208eabe (patch)
treedcfee04a77db5bb3c8042be5b0b95c54bf8759c9 /gnuradio-runtime/lib/flat_flowgraph.cc
parentb8713810a2d07ac1a632bd7bfb23f3f48f67e222 (diff)
runtime: Custom Buffer/Accelerator Device Support - Milestone 1
Custom Buffer/Accelerator Device Support - Milestone 1 changes: * Refactored existing single mapped buffer code and created single mapped buffer abstraction; wrapping within single mapped buffers is handled explicitly by input blocked and output blocked callbacks that are called from block_executor * Added simple custom buffer allocation interface (NOTE: this interface will change for milestone 2) * Accelerated blocks are still responsible for data transfer but the custom buffer interface eliminates the double copy problem Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib/flat_flowgraph.cc')
-rw-r--r--gnuradio-runtime/lib/flat_flowgraph.cc206
1 files changed, 116 insertions, 90 deletions
diff --git a/gnuradio-runtime/lib/flat_flowgraph.cc b/gnuradio-runtime/lib/flat_flowgraph.cc
index 0df75553e0..b9986f8370 100644
--- a/gnuradio-runtime/lib/flat_flowgraph.cc
+++ b/gnuradio-runtime/lib/flat_flowgraph.cc
@@ -15,6 +15,9 @@
#include "flat_flowgraph.h"
#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
+#include <gnuradio/buffer_reader.h>
+#include <gnuradio/buffer_type.h>
+#include <gnuradio/integer_math.h>
#include <gnuradio/logger.h>
#include <gnuradio/prefs.h>
#include <volk/volk.h>
@@ -24,6 +27,7 @@
namespace gr {
+
// 32Kbyte buffer size between blocks
#define GR_FIXED_BUFFER_SIZE (32 * (1L << 10))
@@ -44,8 +48,9 @@ void flat_flowgraph::setup_connections()
basic_block_vector_t blocks = calc_used_blocks();
// Assign block details to blocks
- for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++)
- cast_to_block_sptr(*p)->set_detail(allocate_block_detail(*p));
+ for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
+ allocate_block_detail(*p);
+ }
// Connect inputs to outputs for each block
for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
@@ -56,7 +61,7 @@ void flat_flowgraph::setup_connections()
block->set_is_unaligned(false);
}
- // Connect message ports connetions
+ // Connect message ports connections
for (msg_edge_viter_t i = d_msg_edges.begin(); i != d_msg_edges.end(); i++) {
GR_LOG_DEBUG(
d_debug_logger,
@@ -67,11 +72,10 @@ void flat_flowgraph::setup_connections()
}
}
-block_detail_sptr flat_flowgraph::allocate_block_detail(basic_block_sptr block)
+void flat_flowgraph::allocate_block_detail(basic_block_sptr block)
{
int ninputs = calc_used_ports(block, true).size();
int noutputs = calc_used_ports(block, false).size();
- block_detail_sptr detail = make_block_detail(ninputs, noutputs);
block_sptr grblock = cast_to_block_sptr(block);
if (!grblock)
@@ -80,98 +84,88 @@ block_detail_sptr flat_flowgraph::allocate_block_detail(basic_block_sptr block)
block->alias())
.str());
- GR_LOG_DEBUG(d_debug_logger, "Creating block detail for " + block->identifier());
+ // Determine the downstream max per output port
+ std::vector<int> downstream_max_nitems(noutputs, 0);
+ std::vector<uint64_t> downstream_lcm_nitems(noutputs, 1);
+#ifdef BUFFER_DEBUG
+ std::ostringstream msg;
+ msg << "BLOCK: " << block->identifier();
+ GR_LOG_DEBUG(d_logger, msg.str()); // could also be d_debug_logger
+#endif
for (int i = 0; i < noutputs; i++) {
- grblock->expand_minmax_buffer(i);
+ int nitems = 0;
+ uint64_t lcm_nitems = 1;
+ basic_block_vector_t downstream_blocks = calc_downstream_blocks(grblock, i);
+ for (basic_block_viter_t blk = downstream_blocks.begin();
+ blk != downstream_blocks.end();
+ blk++) {
+ block_sptr dgrblock = cast_to_block_sptr(*blk);
+ if (!dgrblock)
+ throw std::runtime_error("allocate_buffer found non-gr::block");
+
+#ifdef BUFFER_DEBUG
+ msg.str("");
+ msg << " DWNSTRM BLOCK: " << dgrblock->identifier();
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
- buffer_sptr buffer = allocate_buffer(block, i);
- GR_LOG_DEBUG(d_debug_logger,
- "Allocated buffer for output " + block->identifier() + " " +
- std::to_string(i));
- detail->set_output(i, buffer);
-
- // Update the block's max_output_buffer based on what was actually allocated.
- if ((grblock->max_output_buffer(i) != buffer->bufsize()) &&
- (grblock->max_output_buffer(i) != -1))
- GR_LOG_WARN(d_logger,
- boost::format("Block (%1%) max output buffer set to %2%"
- " instead of requested %3%") %
- grblock->alias() % buffer->bufsize() %
- grblock->max_output_buffer(i));
- grblock->set_max_output_buffer(i, buffer->bufsize());
- }
+ // If any downstream blocks are decimators and/or have a large
+ // output_multiple, ensure we have a buffer at least twice their
+ // decimation factor*output_multiple
+ double decimation = (1.0 / dgrblock->relative_rate());
+ int multiple = dgrblock->output_multiple();
+ int history = dgrblock->history();
+ nitems =
+ std::max(nitems, static_cast<int>(2 * (decimation * multiple + history)));
+
+ // Calculate the LCM of downstream reader nitems
+#ifdef BUFFER_DEBUG
+ msg.str("");
+ msg << " OUT MULTIPLE: " << multiple;
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
- return detail;
-}
+ if (dgrblock->fixed_rate()) {
+ lcm_nitems = GR_LCM(lcm_nitems,
+ (uint64_t)(dgrblock->fixed_rate_noutput_to_ninput(1) -
+ (dgrblock->history() - 1)));
+ }
+ if (dgrblock->relative_rate() != 1.0) {
+ // Relative rate
+ lcm_nitems = GR_LCM(lcm_nitems, dgrblock->relative_rate_d());
+ }
-buffer_sptr flat_flowgraph::allocate_buffer(basic_block_sptr block, int port)
-{
- block_sptr grblock = cast_to_block_sptr(block);
- if (!grblock)
- throw std::runtime_error("allocate_buffer found non-gr::block");
- int item_size = block->output_signature()->sizeof_stream_item(port);
-
- // *2 because we're now only filling them 1/2 way in order to
- // increase the available parallelism when using the TPB scheduler.
- // (We're double buffering, where we used to single buffer)
- int nitems = s_fixed_buffer_size * 2 / item_size;
-
- // Make sure there are at least twice the output_multiple no. of items
- if (nitems < 2 * grblock->output_multiple()) // Note: this means output_multiple()
- nitems = 2 * grblock->output_multiple(); // can't be changed by block dynamically
-
- // If any downstream blocks are decimators and/or have a large output_multiple,
- // ensure we have a buffer at least twice their decimation factor*output_multiple
- basic_block_vector_t blocks = calc_downstream_blocks(block, port);
-
- // limit buffer size if indicated
- if (grblock->max_output_buffer(port) > 0) {
- // GR_LOG_INFO(d_debug_logger, boost::format("constraining output items to %d")
- // % block->max_output_buffer(port));
- nitems = std::min((long)nitems, (long)grblock->max_output_buffer(port));
- nitems -= nitems % grblock->output_multiple();
- if (nitems < 1)
- throw std::runtime_error("problems allocating a buffer with the given max "
- "output buffer constraint!");
- } else if (grblock->min_output_buffer(port) > 0) {
- nitems = std::max((long)nitems, (long)grblock->min_output_buffer(port));
- nitems -= nitems % grblock->output_multiple();
- if (nitems < 1)
- throw std::runtime_error("problems allocating a buffer with the given min "
- "output buffer constraint!");
- }
+ // Sanity check, make sure lcm_nitems is at least 1
+ if (lcm_nitems < 1) {
+ lcm_nitems = 1;
+ }
- for (basic_block_viter_t p = blocks.begin(); p != blocks.end(); p++) {
- block_sptr dgrblock = cast_to_block_sptr(*p);
- if (!dgrblock)
- throw std::runtime_error("allocate_buffer found non-gr::block");
-
- double decimation = (1.0 / dgrblock->relative_rate());
- int multiple = dgrblock->output_multiple();
- int history = dgrblock->history();
- nitems =
- std::max(nitems, static_cast<int>(2 * (decimation * multiple + history)));
- }
+#ifdef BUFFER_DEBUG
+ msg.str("");
+ msg << " NINPUT_ITEMS: " << nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
- // std::cout << "make_buffer(" << nitems << ", " << item_size << ", " << grblock <<
- // "\n";
- // We're going to let this fail once and retry. If that fails,
- // throw and exit.
- buffer_sptr b;
- try {
- b = make_buffer(nitems, item_size, grblock);
- } catch (std::bad_alloc&) {
- b = make_buffer(nitems, item_size, grblock);
- }
+ msg.str("");
+ msg << " LCM NITEMS: " << lcm_nitems;
+ GR_LOG_DEBUG(d_logger, msg.str());
- // Set the max noutput items size here to make sure it's always
- // set in the block and available in the start() method.
- // But don't overwrite if the user has set this externally.
- if (!grblock->is_set_max_noutput_items())
- grblock->set_max_noutput_items(nitems);
+ msg.str("");
+ msg << " HISTORY: " << dgrblock->history();
+ GR_LOG_DEBUG(d_logger, msg.str());
- return b;
+ msg.str("");
+ msg << " DELAY: " << dgrblock->sample_delay(0);
+ GR_LOG_DEBUG(d_logger, msg.str());
+#endif
+ }
+ downstream_max_nitems[i] = nitems;
+ downstream_lcm_nitems[i] = lcm_nitems;
+ }
+
+ // Allocate the block detail and necessary buffers
+ grblock->allocate_detail(
+ ninputs, noutputs, downstream_max_nitems, downstream_lcm_nitems);
}
void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
@@ -194,8 +188,40 @@ void flat_flowgraph::connect_block_inputs(basic_block_sptr block)
block_sptr src_grblock = cast_to_block_sptr(src_block);
if (!src_grblock)
throw std::runtime_error("connect_block_inputs found non-gr::block");
- buffer_sptr src_buffer = src_grblock->detail()->output(src_port);
+ buffer_sptr src_buffer;
+ buffer_type_t src_buf_type = src_grblock->get_buffer_type();
+ buffer_type_t dest_buf_type = grblock->get_buffer_type();
+ if (dest_buf_type == buftype_DEFAULT_NON_CUSTOM::get() ||
+ dest_buf_type == src_buf_type) {
+ // The block is not using a custom buffer OR the block and the upstream
+ // block both use the same kind of custom buffer
+ src_buffer = src_grblock->detail()->output(src_port);
+ } else {
+ if (dest_buf_type != buftype_DEFAULT_NON_CUSTOM::get() &&
+ src_buf_type == buftype_DEFAULT_NON_CUSTOM::get()) {
+ // The block uses a custom buffer but the upstream block does not
+ // therefore the upstream block's buffer can be replaced with the
+ // type of buffer that the block needs
+ std::ostringstream msg;
+ msg << "Block: " << grblock->identifier()
+ << "replacing upstream block: " << src_grblock->identifier()
+ << " buffer with a custom buffer";
+ GR_LOG_DEBUG(d_debug_logger, msg.str());
+ src_buffer = src_grblock->replace_buffer(src_port, grblock);
+ } else {
+ // Both the block and upstream block use incompatible buffer types
+ // which is not currently allowed
+ std::ostringstream msg;
+ msg << "Block: " << grblock->identifier()
+ << " and upstream block: " << src_grblock->identifier()
+ << " use incompatible custom buffer types (" << dest_buf_type.name()
+ << " -- " << src_buf_type.name() << ") --> "
+ << (dest_buf_type == src_buf_type);
+ GR_LOG_ERROR(d_logger, msg.str());
+ throw std::runtime_error(msg.str());
+ }
+ }
GR_LOG_DEBUG(d_debug_logger,
"Setting input " + std::to_string(dst_port) + " from edge " +
@@ -220,7 +246,7 @@ void flat_flowgraph::merge_connections(flat_flowgraph_sptr old_ffg)
if (!block->detail()) {
GR_LOG_DEBUG(d_debug_logger,
"merge: allocating new detail for block " + block->identifier());
- block->set_detail(allocate_block_detail(block));
+ allocate_block_detail(block);
} else {
GR_LOG_DEBUG(d_debug_logger,
"merge: reusing original detail for block " +