summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/single_threaded_scheduler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-runtime/lib/single_threaded_scheduler.cc')
-rw-r--r--gnuradio-runtime/lib/single_threaded_scheduler.cc363
1 files changed, 363 insertions, 0 deletions
diff --git a/gnuradio-runtime/lib/single_threaded_scheduler.cc b/gnuradio-runtime/lib/single_threaded_scheduler.cc
new file mode 100644
index 0000000000..0d7f91670b
--- /dev/null
+++ b/gnuradio-runtime/lib/single_threaded_scheduler.cc
@@ -0,0 +1,363 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2004 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <gnuradio/single_threaded_scheduler.h>
+#include <gnuradio/block.h>
+#include <gnuradio/block_detail.h>
+#include <gnuradio/buffer.h>
+#include <boost/thread.hpp>
+#include <boost/format.hpp>
+#include <iostream>
+#include <limits>
+#include <assert.h>
+#include <stdio.h>
+
+namespace gr {
+
+ // must be defined to either 0 or 1
+#define ENABLE_LOGGING 0
+
+#if (ENABLE_LOGGING)
+#define LOG(x) do { x; } while(0)
+#else
+#define LOG(x) do {;} while(0)
+#endif
+
+ static int which_scheduler = 0;
+
+ single_threaded_scheduler_sptr
+ make_single_threaded_scheduler(const std::vector<block_sptr> &blocks)
+ {
+ return single_threaded_scheduler_sptr
+ (new single_threaded_scheduler(blocks));
+ }
+
+ single_threaded_scheduler::single_threaded_scheduler(const std::vector<block_sptr> &blocks)
+ : d_blocks(blocks), d_enabled(true), d_log(0)
+ {
+ if(ENABLE_LOGGING) {
+ std::string name = str(boost::format("sst-%d.log") % which_scheduler++);
+ d_log = new std::ofstream(name.c_str());
+ *d_log << "single_threaded_scheduler: "
+ << d_blocks.size ()
+ << " blocks\n";
+ }
+ }
+
+ single_threaded_scheduler::~single_threaded_scheduler()
+ {
+ if(ENABLE_LOGGING)
+ delete d_log;
+ }
+
+ void
+ single_threaded_scheduler::run()
+ {
+ // d_enabled = true; // KLUDGE
+ main_loop ();
+ }
+
+ void
+ single_threaded_scheduler::stop()
+ {
+ if(0)
+ std::cout << "gr_singled_threaded_scheduler::stop() "
+ << this << std::endl;
+ d_enabled = false;
+ }
+
+ inline static unsigned int
+ round_up(unsigned int n, unsigned int multiple)
+ {
+ return ((n + multiple - 1) / multiple) * multiple;
+ }
+
+ inline static unsigned int
+ round_down(unsigned int n, unsigned int multiple)
+ {
+ return (n / multiple) * multiple;
+ }
+
+ //
+ // Return minimum available write space in all our downstream
+ // buffers or -1 if we're output blocked and the output we're
+ // blocked on is done.
+ //
+ static int
+ min_available_space(block_detail *d, int output_multiple)
+ {
+ int min_space = std::numeric_limits<int>::max();
+
+ for(int i = 0; i < d->noutputs (); i++) {
+ int n = round_down (d->output(i)->space_available (), output_multiple);
+ if(n == 0) { // We're blocked on output.
+ if(d->output(i)->done()) { // Downstream is done, therefore we're done.
+ return -1;
+ }
+ return 0;
+ }
+ min_space = std::min (min_space, n);
+ }
+ return min_space;
+ }
+
+ void
+ single_threaded_scheduler::main_loop()
+ {
+ static const int DEFAULT_CAPACITY = 16;
+
+ int noutput_items;
+ gr_vector_int ninput_items_required(DEFAULT_CAPACITY);
+ gr_vector_int ninput_items(DEFAULT_CAPACITY);
+ gr_vector_const_void_star input_items(DEFAULT_CAPACITY);
+ gr_vector_void_star output_items(DEFAULT_CAPACITY);
+ unsigned int bi;
+ unsigned int nalive;
+ int max_items_avail;
+ bool made_progress_last_pass;
+ bool making_progress;
+
+ for(unsigned i = 0; i < d_blocks.size (); i++)
+ d_blocks[i]->detail()->set_done (false); // reset any done flags
+
+ for(unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc.
+ d_blocks[i]->start();
+
+ bi = 0;
+ made_progress_last_pass = true;
+ making_progress = false;
+
+ // Loop while there are still blocks alive
+
+ nalive = d_blocks.size ();
+ while(d_enabled && nalive > 0) {
+ if(boost::this_thread::interruption_requested())
+ break;
+
+ block *m = d_blocks[bi].get ();
+ block_detail *d = m->detail().get ();
+
+ LOG(*d_log << std::endl << m);
+
+ if(d->done ())
+ goto next_block;
+
+ if(d->source_p ()) {
+ // Invoke sources as a last resort. As long as the previous
+ // pass made progress, don't call a source.
+ if(made_progress_last_pass) {
+ LOG(*d_log << " Skipping source\n");
+ goto next_block;
+ }
+
+ ninput_items_required.resize (0);
+ ninput_items.resize (0);
+ input_items.resize (0);
+ output_items.resize (d->noutputs ());
+
+ // determine the minimum available output space
+ noutput_items = min_available_space (d, m->output_multiple ());
+ LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
+ if(noutput_items == -1) // we're done
+ goto were_done;
+
+ if(noutput_items == 0) { // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ goto next_block;
+ }
+
+ goto setup_call_to_work; // jump to common code
+ }
+
+ else if(d->sink_p ()) {
+ ninput_items_required.resize (d->ninputs ());
+ ninput_items.resize (d->ninputs ());
+ input_items.resize (d->ninputs ());
+ output_items.resize (0);
+ LOG(*d_log << " sink\n");
+
+ max_items_avail = 0;
+ for(int i = 0; i < d->ninputs (); i++) {
+ ninput_items[i] = d->input(i)->items_available();
+ //if (ninput_items[i] == 0 && d->input(i)->done())
+ if(ninput_items[i] < m->output_multiple() && d->input(i)->done())
+ goto were_done;
+
+ max_items_avail = std::max (max_items_avail, ninput_items[i]);
+ }
+
+ // take a swag at how much output we can sink
+ noutput_items = (int) (max_items_avail * m->relative_rate ());
+ noutput_items = round_down (noutput_items, m->output_multiple ());
+ LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
+ LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
+
+ if(noutput_items == 0) { // we're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ goto next_block;
+ }
+
+ goto try_again; // Jump to code shared with regular case.
+ }
+
+ else {
+ // do the regular thing
+ ninput_items_required.resize(d->ninputs ());
+ ninput_items.resize(d->ninputs ());
+ input_items.resize(d->ninputs ());
+ output_items.resize(d->noutputs ());
+
+ max_items_avail = 0;
+ for(int i = 0; i < d->ninputs (); i++) {
+ ninput_items[i] = d->input(i)->items_available ();
+ max_items_avail = std::max(max_items_avail, ninput_items[i]);
+ }
+
+ // determine the minimum available output space
+ noutput_items = min_available_space(d, m->output_multiple ());
+ if(ENABLE_LOGGING){
+ *d_log << " regular ";
+ if(m->relative_rate() >= 1.0)
+ *d_log << "1:" << m->relative_rate() << std::endl;
+ else
+ *d_log << 1.0/m->relative_rate() << ":1\n";
+ *d_log << " max_items_avail = " << max_items_avail << std::endl;
+ *d_log << " noutput_items = " << noutput_items << std::endl;
+ }
+ if(noutput_items == -1) // we're done
+ goto were_done;
+
+ if(noutput_items == 0) { // we're output blocked
+ LOG(*d_log << " BLKD_OUT\n");
+ goto next_block;
+ }
+
+#if 0
+ // Compute best estimate of noutput_items that we can really use.
+ noutput_items =
+ std::min((unsigned)noutput_items,
+ std::max((unsigned)m->output_multiple(),
+ round_up((unsigned)(max_items_avail * m->relative_rate()),
+ m->output_multiple())));
+
+ LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl);
+#endif
+
+ try_again:
+ if(m->fixed_rate()) {
+ // try to work it forward starting with max_items_avail.
+ // We want to try to consume all the input we've got.
+ int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
+ reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
+ if(reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
+ noutput_items = reqd_noutput_items;
+ }
+
+ // ask the block how much input they need to produce noutput_items
+ m->forecast(noutput_items, ninput_items_required);
+
+ // See if we've got sufficient input available
+ int i;
+ for(i = 0; i < d->ninputs (); i++)
+ if(ninput_items_required[i] > ninput_items[i]) // not enough
+ break;
+
+ if(i < d->ninputs()) { // not enough input on input[i]
+ // if we can, try reducing the size of our output request
+ if(noutput_items > m->output_multiple ()){
+ noutput_items /= 2;
+ noutput_items = round_up (noutput_items, m->output_multiple ());
+ goto try_again;
+ }
+
+ // We're blocked on input
+ LOG(*d_log << " BLKD_IN\n");
+ if(d->input(i)->done()) // If the upstream block is done, we're done
+ goto were_done;
+
+ // Is it possible to ever fulfill this request?
+ if(ninput_items_required[i] > d->input(i)->max_possible_items_available ()) {
+ // Nope, never going to happen...
+ std::cerr << "\nsched: <block " << m->name()
+ << " (" << m->unique_id() << ")>"
+ << " is requesting more input data\n"
+ << " than we can provide.\n"
+ << " ninput_items_required = "
+ << ninput_items_required[i] << "\n"
+ << " max_possible_items_available = "
+ << d->input(i)->max_possible_items_available() << "\n"
+ << " If this is a filter, consider reducing the number of taps.\n";
+ goto were_done;
+ }
+
+ goto next_block;
+ }
+
+ // We've got enough data on each input to produce noutput_items.
+ // Finish setting up the call to work.
+ for(int i = 0; i < d->ninputs (); i++)
+ input_items[i] = d->input(i)->read_pointer();
+
+ setup_call_to_work:
+
+ for(int i = 0; i < d->noutputs (); i++)
+ output_items[i] = d->output(i)->write_pointer();
+
+ // Do the actual work of the block
+ int n = m->general_work(noutput_items, ninput_items,
+ input_items, output_items);
+ LOG(*d_log << " general_work: noutput_items = " << noutput_items
+ << " result = " << n << std::endl);
+
+ if(n == -1) // block is done
+ goto were_done;
+
+ d->produce_each(n); // advance write pointers
+ if(n > 0)
+ making_progress = true;
+
+ goto next_block;
+ }
+ assert(0);
+
+ were_done:
+ LOG(*d_log << " were_done\n");
+ d->set_done (true);
+ nalive--;
+
+ next_block:
+ if(++bi >= d_blocks.size ()) {
+ bi = 0;
+ made_progress_last_pass = making_progress;
+ making_progress = false;
+ }
+ }
+
+ for(unsigned i = 0; i < d_blocks.size(); i++) // disable any drivers, etc.
+ d_blocks[i]->stop();
+ }
+
+} /* namespace gr */