diff options
author | David Sorber <david.sorber@blacklynx.tech> | 2021-10-25 07:12:03 -0400 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2021-10-25 11:27:01 -0400 |
commit | 9147c6eb99862697b454471b7882853e355f84cb (patch) | |
tree | acdbdf5b15e739320673f8a3a2699fde6b8b514a /gnuradio-runtime/lib | |
parent | d4bd90853f499d5b65a61b3b7bf9ecf50e68bf6b (diff) |
runtime: add logic to call the input_blocked_callback() if a sink block
determines that it is input blocked
Signed-off-by: David Sorber <david.sorber@blacklynx.tech>
Diffstat (limited to 'gnuradio-runtime/lib')
-rw-r--r-- | gnuradio-runtime/lib/block_executor.cc | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/gnuradio-runtime/lib/block_executor.cc b/gnuradio-runtime/lib/block_executor.cc index ea59196571..e175612431 100644 --- a/gnuradio-runtime/lib/block_executor.cc +++ b/gnuradio-runtime/lib/block_executor.cc @@ -355,6 +355,32 @@ block_executor::state block_executor::run_one_iteration() goto were_done; max_items_avail = std::max(max_items_avail, d_ninput_items[i]); + + // Estimate if we are going to be blocked so we can call the input blocked + // callback on the offending input + int tmp_noutput_items = (int)(max_items_avail * m->relative_rate()); + tmp_noutput_items = round_down(tmp_noutput_items, m->output_multiple()); + tmp_noutput_items = std::min(tmp_noutput_items, max_noutput_items); + if (tmp_noutput_items == 0) { + // NOTE: normally input_blkd_cb_ready() and input_blocked_callback() + // need "ninput_items_required" but it hasn't been calculated + // yet, so instead guesstimate input required is one more than currently + // available and see if that unblocks the input. + buffer_reader_sptr in_buf = d->input(i); + if (in_buf->input_blkd_cb_ready(d_ninput_items[i] + 1)) { + gr::custom_lock lock(std::ref(*in_buf->mutex()), in_buf->buffer()); + if (in_buf->input_blocked_callback(d_ninput_items[i] + 1, + d_ninput_items[i])) { + LOG(std::ostringstream msg; msg << m << " -- BLKD_IN"; + GR_LOG_INFO(d_logger, msg.str())); + return BLKD_IN; + } + + // Recalculate after successfully executing the input blocked callback + d_ninput_items[i] = in_buf->items_available(); + max_items_avail = std::max(max_items_avail, d_ninput_items[i]); + } + } } // take a swag at how much output we can sink |